10467: Merge branch 'master' into 10467-client-disconnect
authorTom Clegg <tom@curoverse.com>
Tue, 8 Nov 2016 20:37:26 +0000 (15:37 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 8 Nov 2016 20:37:26 +0000 (15:37 -0500)
1  2 
build/run-tests.sh
services/keepstore/s3_volume.go

diff --combined build/run-tests.sh
index e326c2287f05f3b10a178384b7fdf2b49f688647,fa608738c7e63cd3aa24ad8457fafc7bed829e84..8959cfbe09c3ea7ac6ded2142b626259787d2121
@@@ -93,6 -93,7 +93,7 @@@ sdk/go/streame
  sdk/go/crunchrunner
  sdk/cwl
  tools/crunchstat-summary
+ tools/keep-exercise
  tools/keep-rsync
  tools/keep-block-check
  
@@@ -158,8 -159,8 +159,8 @@@ sanity_checks() 
      echo -n 'go: '
      go version \
          || fatal "No go binary. See http://golang.org/doc/install"
 -    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
 -        || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
 +    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 7 ]] \
 +        || fatal "Go >= 1.7 required. See http://golang.org/doc/install"
      echo -n 'gcc: '
      gcc --version | egrep ^gcc \
          || fatal "No gcc. Try: apt-get install build-essential"
@@@ -764,8 -765,9 +765,9 @@@ gostuff=
      services/crunch-dispatch-local
      services/crunch-dispatch-slurm
      services/crunch-run
-     tools/keep-rsync
      tools/keep-block-check
+     tools/keep-exercise
+     tools/keep-rsync
      )
  for g in "${gostuff[@]}"
  do
index 33919a37e1371fa322503b3aed3add9f15bf2cf5,ed1161c38d7bf977358de642aed390b1f410e089..17923f807dc8a8f11bc77ce8dc0732001a4a8ba8
@@@ -1,14 -1,11 +1,14 @@@
  package main
  
  import (
 +      "bytes"
 +      "context"
        "encoding/base64"
        "encoding/hex"
        "flag"
        "fmt"
        "io"
 +      "io/ioutil"
        "log"
        "net/http"
        "os"
        "github.com/AdRoll/goamz/s3"
  )
  
+ const (
+       s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
+       s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+ )
  var (
        // ErrS3TrashDisabled is returned by Trash if that operation
        // is impossible with the current config.
@@@ -216,10 -218,10 +221,10 @@@ func (v *S3Volume) Start() error 
        // Zero timeouts mean "wait forever", which is a bad
        // default. Default to long timeouts instead.
        if v.ConnectTimeout == 0 {
-               v.ConnectTimeout = arvados.Duration(time.Minute)
+               v.ConnectTimeout = s3DefaultConnectTimeout
        }
        if v.ReadTimeout == 0 {
-               v.ReadTimeout = arvados.Duration(10 * time.Minute)
+               v.ReadTimeout = s3DefaultReadTimeout
        }
  
        client := s3.New(auth, region)
        return nil
  }
  
 +func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
 +      ready := make(chan bool)
 +      go func() {
 +              rdr, err = v.getReader(loc)
 +              close(ready)
 +      }()
 +      select {
 +      case <-ready:
 +              return
 +      case <-ctx.Done():
 +              theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
 +              go func() {
 +                      <-ready
 +                      if err == nil {
 +                              rdr.Close()
 +                      }
 +              }()
 +              return nil, ctx.Err()
 +      }
 +}
 +
  // getReader wraps (Bucket)GetReader.
  //
  // In situations where (Bucket)GetReader would fail because the block
@@@ -285,106 -266,50 +290,106 @@@ func (v *S3Volume) getReader(loc string
  
  // Get a block: copy the block data into buf, and return the number of
  // bytes copied.
 -func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 -      rdr, err := v.getReader(loc)
 +func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 +      rdr, err := v.getReaderWithContext(ctx, loc)
        if err != nil {
                return 0, err
        }
 -      defer rdr.Close()
 -      n, err := io.ReadFull(rdr, buf)
 -      switch err {
 -      case nil, io.EOF, io.ErrUnexpectedEOF:
 -              return n, nil
 -      default:
 -              return 0, v.translateError(err)
 +
 +      var n int
 +      ready := make(chan bool)
 +      go func() {
 +              defer close(ready)
 +
 +              defer rdr.Close()
 +              n, err = io.ReadFull(rdr, buf)
 +
 +              switch err {
 +              case nil, io.EOF, io.ErrUnexpectedEOF:
 +                      err = nil
 +              default:
 +                      err = v.translateError(err)
 +              }
 +      }()
 +      select {
 +      case <-ctx.Done():
 +              theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
 +              rdr.Close()
 +              // Must wait for ReadFull to return, to ensure it
 +              // doesn't write to buf after we return.
 +              theConfig.debugLogf("s3: waiting for ReadFull() to fail")
 +              <-ready
 +              return 0, ctx.Err()
 +      case <-ready:
 +              return n, err
        }
  }
  
  // Compare the given data with the stored data.
 -func (v *S3Volume) Compare(loc string, expect []byte) error {
 -      rdr, err := v.getReader(loc)
 +func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
 +      rdr, err := v.getReaderWithContext(ctx, loc)
        if err != nil {
                return err
        }
        defer rdr.Close()
 -      return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
 +      return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
  }
  
  // Put writes a block.
 -func (v *S3Volume) Put(loc string, block []byte) error {
 +func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
        var opts s3.Options
 -      if len(block) > 0 {
 +      size := len(block)
 +      if size > 0 {
                md5, err := hex.DecodeString(loc)
                if err != nil {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
        }
 -      err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
 -      if err != nil {
 +
 +      // Send the block data through a pipe, so that (if we need to)
 +      // we can close the pipe early and abandon our PutReader()
 +      // goroutine, without worrying about PutReader() accessing our
 +      // block buffer after we release it.
 +      bufr, bufw := io.Pipe()
 +      go func() {
 +              io.Copy(bufw, bytes.NewReader(block))
 +              bufw.Close()
 +      }()
 +
 +      var err error
 +      ready := make(chan bool)
 +      go func() {
 +              defer func() {
 +                      if ctx.Err() != nil {
 +                              theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
 +                      }
 +              }()
 +              defer close(ready)
 +              err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
 +              if err != nil {
 +                      return
 +              }
 +              err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
 +      }()
 +      select {
 +      case <-ctx.Done():
 +              theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
 +              // Our pipe might be stuck in Write(), waiting for
 +              // io.Copy() to read. If so, un-stick it. This means
 +              // PutReader will get corrupt data, but that's OK: the
 +              // size and MD5 won't match, so the write will fail.
 +              go io.Copy(ioutil.Discard, bufr)
 +              // CloseWithError() will return once pending I/O is done.
 +              bufw.CloseWithError(ctx.Err())
 +              theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
 +              return ctx.Err()
 +      case <-ready:
                return v.translateError(err)
        }
 -      err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
 -      return v.translateError(err)
  }
  
  // Touch sets the timestamp for the given locator to the current time.