Merge branch '17816-singularity-cwd' into main refs #17816
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 16 Jul 2021 15:27:06 +0000 (11:27 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 16 Jul 2021 15:27:06 +0000 (11:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
sdk/go/arvados/api.go
sdk/go/arvados/fs_backend.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site_test.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go

index 08e4aa3899ec46ca68d86d5ef1ea930057830c00..412f1bbfbfa95027eb5c043c5e1fcf07449139b0 100644 (file)
@@ -55,7 +55,7 @@ var ErrCancelled = errors.New("Cancelled")
 
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
-       PutB(buf []byte) (string, int, error)
+       BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
index 22d65334ed3bc2a30228c6ab5c55dbd51e674f6c..bb7ffdf0306b26b2f5c56062aaaaaf7b256e5447 100644 (file)
@@ -305,9 +305,11 @@ func (client *KeepTestClient) LocalLocator(locator string) (string, error) {
        return locator, nil
 }
 
-func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
-       client.Content = buf
-       return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
+func (client *KeepTestClient) BlockWrite(_ context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+       client.Content = opts.Data
+       return arvados.BlockWriteResponse{
+               Locator: fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)),
+       }, nil
 }
 
 func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
@@ -453,8 +455,8 @@ func (*KeepErrorTestClient) ManifestFileReader(manifest.Manifest, string) (arvad
        return nil, errors.New("KeepError")
 }
 
-func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
-       return "", 0, errors.New("KeepError")
+func (*KeepErrorTestClient) BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+       return arvados.BlockWriteResponse{}, errors.New("KeepError")
 }
 
 func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
index 4e0348c083df7617442fd6c8bfe133264c91f56f..a57f2a6838bb2b9cd034ca00b245cae41157cd74 100644 (file)
@@ -8,6 +8,7 @@ import (
        "bufio"
        "context"
        "encoding/json"
+       "io"
        "net"
 
        "github.com/sirupsen/logrus"
@@ -205,6 +206,22 @@ type LogoutOptions struct {
        ReturnTo string `json:"return_to"` // Redirect to this URL after logging out
 }
 
+type BlockWriteOptions struct {
+       Hash           string
+       Data           []byte
+       Reader         io.Reader
+       DataSize       int // Must be set if Data is nil.
+       RequestID      string
+       StorageClasses []string
+       Replicas       int
+       Attempts       int
+}
+
+type BlockWriteResponse struct {
+       Locator  string
+       Replicas int
+}
+
 type API interface {
        ConfigGet(ctx context.Context) (json.RawMessage, error)
        Login(ctx context.Context, options LoginOptions) (LoginResponse, error)
index c8308aea59e94d06be2e94235b2f0bda4f16f8c0..32365a5317ec79d50dd7f47f71359bcd6536f881 100644 (file)
@@ -4,7 +4,10 @@
 
 package arvados
 
-import "io"
+import (
+       "context"
+       "io"
+)
 
 type fsBackend interface {
        keepClient
@@ -20,7 +23,7 @@ type keepBackend struct {
 
 type keepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
-       PutB(p []byte) (string, int, error)
+       BlockWrite(context.Context, BlockWriteOptions) (BlockWriteResponse, error)
        LocalLocator(locator string) (string, error)
 }
 
index b743ab368e33f69a5c1710d63dc410af8a380ffc..4d9db421fc3838b268fdeaeea1b81b9ca1192843 100644 (file)
@@ -42,7 +42,9 @@ type CollectionFileSystem interface {
 
 type collectionFileSystem struct {
        fileSystem
-       uuid string
+       uuid           string
+       replicas       int
+       storageClasses []string
 }
 
 // FileSystem returns a CollectionFileSystem for the collection.
@@ -52,12 +54,16 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                modTime = time.Now()
        }
        fs := &collectionFileSystem{
-               uuid: c.UUID,
+               uuid:           c.UUID,
+               storageClasses: c.StorageClassesDesired,
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: client, keepClient: kc},
                        thr:       newThrottle(concurrentWriters),
                },
        }
+       if r := c.ReplicationDesired; r != nil {
+               fs.replicas = *r
+       }
        root := &dirnode{
                fs: fs,
                treenode: treenode{
@@ -321,7 +327,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
 // filenode implements inode.
 type filenode struct {
        parent   inode
-       fs       FileSystem
+       fs       *collectionFileSystem
        fileinfo fileinfo
        segments []segment
        // number of times `segments` has changed in a
@@ -610,7 +616,11 @@ func (fn *filenode) pruneMemSegments() {
                fn.fs.throttle().Acquire()
                go func() {
                        defer close(done)
-                       locator, _, err := fn.FS().PutB(buf)
+                       resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+                               Data:           buf,
+                               Replicas:       fn.fs.replicas,
+                               StorageClasses: fn.fs.storageClasses,
+                       })
                        fn.fs.throttle().Release()
                        fn.Lock()
                        defer fn.Unlock()
@@ -631,7 +641,7 @@ func (fn *filenode) pruneMemSegments() {
                        fn.memsize -= int64(len(buf))
                        fn.segments[idx] = storedSegment{
                                kc:      fn.FS(),
-                               locator: locator,
+                               locator: resp.Locator,
                                size:    len(buf),
                                offset:  0,
                                length:  len(buf),
@@ -748,7 +758,11 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
        go func() {
                defer close(done)
                defer close(errs)
-               locator, _, err := dn.fs.PutB(block)
+               resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+                       Data:           block,
+                       Replicas:       dn.fs.replicas,
+                       StorageClasses: dn.fs.storageClasses,
+               })
                dn.fs.throttle().Release()
                if err != nil {
                        errs <- err
@@ -780,7 +794,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
                        data := ref.fn.segments[ref.idx].(*memSegment).buf
                        ref.fn.segments[ref.idx] = storedSegment{
                                kc:      dn.fs,
-                               locator: locator,
+                               locator: resp.Locator,
                                size:    blocksize,
                                offset:  offsets[idx],
                                length:  len(data),
index 05c8ea61a14500466ff4bc424b8847788408404b..c032b07166fa6abd985f6c902c07c9e4c6e37f25 100644 (file)
@@ -6,6 +6,7 @@ package arvados
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -31,7 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
-       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
+       onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
        authToken   string               // client's auth token (used for signing locators)
        sigkey      string               // blob signing key
        sigttl      time.Duration        // blob signing ttl
@@ -50,17 +51,25 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
        return copy(p, buf[off:]), nil
 }
 
-func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
-       locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
-       buf := make([]byte, len(p))
-       copy(buf, p)
-       if kcs.onPut != nil {
-               kcs.onPut(buf)
+func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+       if opts.Data == nil {
+               panic("oops, stub is not made for this")
+       }
+       locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+       buf := make([]byte, len(opts.Data))
+       copy(buf, opts.Data)
+       if kcs.onWrite != nil {
+               kcs.onWrite(buf)
+       }
+       for _, sc := range opts.StorageClasses {
+               if sc != "default" {
+                       return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
+               }
        }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
-       return locator, 1, nil
+       return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
 }
 
 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
@@ -112,6 +121,22 @@ func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
        c.Check(ok, check.Equals, true)
 }
 
+func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
+       fs, err := (&Collection{
+               StorageClassesDesired: []string{"unobtainium"},
+       }).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte("food"))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       _, err = fs.MarshalManifest(".")
+       c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
+}
+
 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
        fs, err := (&Collection{
                ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
@@ -1061,7 +1086,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
        proceed := make(chan struct{})
        var started, concurrent int32
        blk2done := false
-       s.kc.onPut = func([]byte) {
+       s.kc.onWrite = func([]byte) {
                atomic.AddInt32(&concurrent, 1)
                switch atomic.AddInt32(&started, 1) {
                case 1:
@@ -1127,7 +1152,7 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
 
-       s.kc.onPut = func([]byte) {
+       s.kc.onWrite = func([]byte) {
                // discard flushed data -- otherwise the stub will use
                // unlimited memory
                time.Sleep(time.Millisecond)
@@ -1171,7 +1196,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        c.Assert(err, check.IsNil)
 
        var flushed int64
-       s.kc.onPut = func(p []byte) {
+       s.kc.onWrite = func(p []byte) {
                atomic.AddInt64(&flushed, int64(len(p)))
        }
 
@@ -1239,7 +1264,7 @@ func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
        time.AfterFunc(10*time.Second, func() { close(timeout) })
        var putCount, concurrency int64
        var unflushed int64
-       s.kc.onPut = func(p []byte) {
+       s.kc.onWrite = func(p []byte) {
                defer atomic.AddInt64(&unflushed, -int64(len(p)))
                cur := atomic.AddInt64(&concurrency, 1)
                defer atomic.AddInt64(&concurrency, -1)
@@ -1302,7 +1327,7 @@ func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
        })
 
        wrote := 0
-       s.kc.onPut = func(p []byte) {
+       s.kc.onWrite = func(p []byte) {
                s.kc.Lock()
                s.kc.blocks = map[string][]byte{}
                wrote++
@@ -1333,7 +1358,7 @@ func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
-       s.kc.onPut = func([]byte) {
+       s.kc.onWrite = func([]byte) {
                s.kc.Lock()
                s.kc.blocks = map[string][]byte{}
                s.kc.Unlock()
index dc432114a60ec58e4925be28ddbbc227ce4188e9..3c7c146f6975f26e36e9966ee0c17be7171a9dc6 100644 (file)
@@ -16,18 +16,19 @@ const (
        // Importing arvadostest would be an import cycle, so these
        // fixtures are duplicated here [until fs moves to a separate
        // package].
-       fixtureActiveToken             = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
-       fixtureAProjectUUID            = "zzzzz-j7d0g-v955i6s2oi1cbso"
-       fixtureThisFilterGroupUUID     = "zzzzz-j7d0g-thisfiltergroup"
-       fixtureAFilterGroupTwoUUID     = "zzzzz-j7d0g-afiltergrouptwo"
-       fixtureAFilterGroupThreeUUID   = "zzzzz-j7d0g-filtergroupthre"
-       fixtureFooAndBarFilesInDirUUID = "zzzzz-4zz18-foonbarfilesdir"
-       fixtureFooCollectionName       = "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
-       fixtureFooCollectionPDH        = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
-       fixtureFooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
-       fixtureNonexistentCollection   = "zzzzz-4zz18-totallynotexist"
-       fixtureBlobSigningKey          = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
-       fixtureBlobSigningTTL          = 336 * time.Hour
+       fixtureActiveToken                  = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       fixtureAProjectUUID                 = "zzzzz-j7d0g-v955i6s2oi1cbso"
+       fixtureThisFilterGroupUUID          = "zzzzz-j7d0g-thisfiltergroup"
+       fixtureAFilterGroupTwoUUID          = "zzzzz-j7d0g-afiltergrouptwo"
+       fixtureAFilterGroupThreeUUID        = "zzzzz-j7d0g-filtergroupthre"
+       fixtureFooAndBarFilesInDirUUID      = "zzzzz-4zz18-foonbarfilesdir"
+       fixtureFooCollectionName            = "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
+       fixtureFooCollectionPDH             = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
+       fixtureFooCollection                = "zzzzz-4zz18-fy296fx3hot09f7"
+       fixtureNonexistentCollection        = "zzzzz-4zz18-totallynotexist"
+       fixtureStorageClassesDesiredArchive = "zzzzz-4zz18-3t236wr12769qqa"
+       fixtureBlobSigningKey               = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+       fixtureBlobSigningTTL               = 336 * time.Hour
 )
 
 var _ = check.Suite(&SiteFSSuite{})
@@ -77,6 +78,17 @@ func (s *SiteFSSuite) TestByIDEmpty(c *check.C) {
        c.Check(len(fis), check.Equals, 0)
 }
 
+func (s *SiteFSSuite) TestUpdateStorageClasses(c *check.C) {
+       f, err := s.fs.OpenFile("/by_id/"+fixtureStorageClassesDesiredArchive+"/newfile", os.O_CREATE|os.O_RDWR, 0777)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte("nope"))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       err = s.fs.Sync()
+       c.Assert(err, check.ErrorMatches, `.*stub does not write storage class "archive"`)
+}
+
 func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
        f, err := s.fs.Open("/by_id")
        c.Assert(err, check.IsNil)
index 2b560cff57b084786bca118f12609f00128d2623..2cd6bb4d43ec5e675f5eca46963e2f9d620de0f6 100644 (file)
@@ -8,6 +8,7 @@ package keepclient
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -21,8 +22,8 @@ import (
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
-       "git.arvados.org/arvados.git/sdk/go/asyncbuf"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
@@ -153,23 +154,12 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
 // Returns an InsufficientReplicasError if 0 <= replicas <
 // kc.Wants_replicas.
 func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
-       // Buffer for reads from 'r'
-       var bufsize int
-       if dataBytes > 0 {
-               if dataBytes > BLOCKSIZE {
-                       return "", 0, ErrOversizeBlock
-               }
-               bufsize = int(dataBytes)
-       } else {
-               bufsize = BLOCKSIZE
-       }
-
-       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
-       go func() {
-               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
-               buf.CloseWithError(err)
-       }()
-       return kc.putReplicas(hash, buf.NewReader, dataBytes)
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Hash:     hash,
+               Reader:   r,
+               DataSize: int(dataBytes),
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -177,16 +167,21 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
-       return kc.putReplicas(hash, newReader, int64(len(buf)))
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Hash: hash,
+               Data: buf,
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
-       hash := fmt.Sprintf("%x", md5.Sum(buffer))
-       return kc.PutHB(hash, buffer)
+       resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+               Data: buffer,
+       })
+       return resp.Locator, resp.Replicas, err
 }
 
 // PutR writes a block to Keep. It first reads all data from r into a buffer
index f59d16fd3d05a6409d560ba87ca076a99d105c1c..c52e07b8f6ea3b6ea417f9843049e3d99b986fa5 100644 (file)
@@ -6,6 +6,7 @@ package keepclient
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -20,6 +21,7 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        . "gopkg.in/check.v1"
@@ -173,7 +175,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
-                       go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+                       go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -229,7 +231,7 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
 
                UploadToStubHelper(c, st,
                        func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
-                               go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+                               go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
 
                                writer.Write([]byte("foo"))
                                writer.Close()
@@ -244,19 +246,25 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
        nServers := 5
        for _, trial := range []struct {
-               replicas    int
-               classes     []string
-               minRequests int
-               maxRequests int
-               success     bool
+               replicas      int
+               clientClasses []string
+               putClasses    []string // putClasses takes precedence over clientClasses
+               minRequests   int
+               maxRequests   int
+               success       bool
        }{
-               {1, []string{"class1"}, 1, 1, true},
-               {2, []string{"class1"}, 1, 2, true},
-               {3, []string{"class1"}, 2, 3, true},
-               {1, []string{"class1", "class2"}, 1, 1, true},
-               {nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
-               {1, []string{"class404"}, nServers, nServers, false},
-               {1, []string{"class1", "class404"}, nServers, nServers, false},
+               {1, []string{"class1"}, nil, 1, 1, true},
+               {2, []string{"class1"}, nil, 1, 2, true},
+               {3, []string{"class1"}, nil, 2, 3, true},
+               {1, []string{"class1", "class2"}, nil, 1, 1, true},
+               {3, nil, []string{"class1"}, 2, 3, true},
+               {1, nil, []string{"class1", "class2"}, 1, 1, true},
+               {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
+               {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
+               {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
+               {1, []string{"class404"}, nil, nServers, nServers, false},
+               {1, []string{"class1", "class404"}, nil, nServers, nServers, false},
+               {1, nil, []string{"class1", "class404"}, nServers, nServers, false},
        } {
                c.Logf("%+v", trial)
                st := &StubPutHandler{
@@ -272,7 +280,7 @@ func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
                arv, _ := arvadosclient.MakeArvadosClient()
                kc, _ := MakeKeepClient(arv)
                kc.Want_replicas = trial.replicas
-               kc.StorageClasses = trial.classes
+               kc.StorageClasses = trial.clientClasses
                arv.ApiToken = "abc123"
                localRoots := make(map[string]string)
                writableLocalRoots := make(map[string]string)
@@ -283,7 +291,10 @@ func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
                }
                kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
 
-               _, _, err := kc.PutB([]byte("foo"))
+               _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+                       Data:           []byte("foo"),
+                       StorageClasses: trial.putClasses,
+               })
                if trial.success {
                        c.Check(err, check.IsNil)
                } else {
index 7b2e47ff8042e379c1ac01825f4060011c81b3f9..a8c82aac0e70370dced92b3dd3f5bae249cb100c 100644 (file)
@@ -5,6 +5,8 @@
 package keepclient
 
 import (
+       "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -16,7 +18,9 @@ import (
        "strconv"
        "strings"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/asyncbuf"
 )
 
 // DebugPrintf emits debug messages. The easiest way to enable
@@ -58,7 +62,7 @@ type uploadStatus struct {
 }
 
 func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
-       uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
+       uploadStatusChan chan<- uploadStatus, expectedLength int, reqid string) {
 
        var req *http.Request
        var err error
@@ -69,7 +73,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
                return
        }
 
-       req.ContentLength = expectedLength
+       req.ContentLength = int64(expectedLength)
        if expectedLength > 0 {
                req.Body = ioutil.NopCloser(body)
        } else {
@@ -123,15 +127,57 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
        }
 }
 
-func (kc *KeepClient) putReplicas(
-       hash string,
-       getReader func() io.Reader,
-       expectedLength int64) (locator string, replicas int, err error) {
-
-       reqid := kc.getRequestID()
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+       var resp arvados.BlockWriteResponse
+       var getReader func() io.Reader
+       if req.Data == nil && req.Reader == nil {
+               return resp, errors.New("invalid BlockWriteOptions: Data and Reader are both nil")
+       }
+       if req.DataSize < 0 {
+               return resp, fmt.Errorf("invalid BlockWriteOptions: negative DataSize %d", req.DataSize)
+       }
+       if req.DataSize > BLOCKSIZE || len(req.Data) > BLOCKSIZE {
+               return resp, ErrOversizeBlock
+       }
+       if req.Data != nil {
+               if req.DataSize > len(req.Data) {
+                       return resp, errors.New("invalid BlockWriteOptions: DataSize > len(Data)")
+               }
+               if req.DataSize == 0 {
+                       req.DataSize = len(req.Data)
+               }
+               getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
+       } else {
+               buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+               go func() {
+                       _, err := io.Copy(buf, HashCheckingReader{req.Reader, md5.New(), req.Hash})
+                       buf.CloseWithError(err)
+               }()
+               getReader = buf.NewReader
+       }
+       if req.Hash == "" {
+               m := md5.New()
+               _, err := io.Copy(m, getReader())
+               if err != nil {
+                       return resp, err
+               }
+               req.Hash = fmt.Sprintf("%x", m.Sum(nil))
+       }
+       if req.StorageClasses == nil {
+               req.StorageClasses = kc.StorageClasses
+       }
+       if req.Replicas == 0 {
+               req.Replicas = kc.Want_replicas
+       }
+       if req.RequestID == "" {
+               req.RequestID = kc.getRequestID()
+       }
+       if req.Attempts == 0 {
+               req.Attempts = 1 + kc.Retries
+       }
 
        // Calculate the ordering for uploading to servers
-       sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots()
+       sv := NewRootSorter(kc.WritableLocalRoots(), req.Hash).GetSortedRoots()
 
        // The next server to try contacting
        nextServer := 0
@@ -153,20 +199,18 @@ func (kc *KeepClient) putReplicas(
                }()
        }()
 
-       replicasWanted := kc.Want_replicas
        replicasTodo := map[string]int{}
-       for _, c := range kc.StorageClasses {
-               replicasTodo[c] = replicasWanted
+       for _, c := range req.StorageClasses {
+               replicasTodo[c] = req.Replicas
        }
-       replicasDone := 0
 
        replicasPerThread := kc.replicasPerService
        if replicasPerThread < 1 {
                // unlimited or unknown
-               replicasPerThread = replicasWanted
+               replicasPerThread = req.Replicas
        }
 
-       retriesRemaining := 1 + kc.Retries
+       retriesRemaining := req.Attempts
        var retryServers []string
 
        lastError := make(map[string]string)
@@ -190,7 +234,7 @@ func (kc *KeepClient) putReplicas(
                                }
                        }
                        if !trackingClasses {
-                               maxConcurrency = replicasWanted - replicasDone
+                               maxConcurrency = req.Replicas - resp.Replicas
                        }
                        if maxConcurrency < 1 {
                                // If there are no non-zero entries in
@@ -200,8 +244,8 @@ func (kc *KeepClient) putReplicas(
                        for active*replicasPerThread < maxConcurrency {
                                // Start some upload requests
                                if nextServer < len(sv) {
-                                       DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
-                                       go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
+                                       DebugPrintf("DEBUG: [%s] Begin upload %s to %s", req.RequestID, req.Hash, sv[nextServer])
+                                       go kc.uploadToKeepServer(sv[nextServer], req.Hash, classesTodo, getReader(), uploadStatusChan, req.DataSize, req.RequestID)
                                        nextServer++
                                        active++
                                } else {
@@ -211,13 +255,13 @@ func (kc *KeepClient) putReplicas(
                                                        msg += resp + "; "
                                                }
                                                msg = msg[:len(msg)-2]
-                                               return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
+                                               return resp, InsufficientReplicasError(errors.New(msg))
                                        }
                                        break
                                }
                        }
 
-                       DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+                       DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", req.RequestID, replicasTodo, active)
                        if active < 1 {
                                break
                        }
@@ -228,7 +272,7 @@ func (kc *KeepClient) putReplicas(
 
                        if status.statusCode == http.StatusOK {
                                delete(lastError, status.url)
-                               replicasDone += status.replicasStored
+                               resp.Replicas += status.replicasStored
                                if len(status.classesStored) == 0 {
                                        // Server doesn't report
                                        // storage classes. Give up
@@ -244,7 +288,7 @@ func (kc *KeepClient) putReplicas(
                                                delete(replicasTodo, className)
                                        }
                                }
-                               locator = status.response
+                               resp.Locator = status.response
                        } else {
                                msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
                                if len(msg) > 100 {
@@ -264,7 +308,7 @@ func (kc *KeepClient) putReplicas(
                sv = retryServers
        }
 
-       return locator, replicasDone, nil
+       return resp, nil
 }
 
 func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {