16535: Support Sync() on customfilesystem.
authorTom Clegg <tom@tomclegg.ca>
Wed, 15 Jul 2020 16:15:38 +0000 (12:15 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 15 Jul 2020 16:15:38 +0000 (12:15 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_lookup.go
sdk/go/arvados/fs_project_test.go
sdk/go/arvados/fs_site_test.go

index f01369a885ece3b7c315c832b114caaf77715862..59a6a6ba825e57928e9348c17d971988fa24fc94 100644 (file)
@@ -7,7 +7,6 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
-       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -33,6 +32,9 @@ type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
        onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
+       authToken   string               // client's auth token (used for signing locators)
+       sigkey      string               // blob signing key
+       sigttl      time.Duration        // blob signing ttl
        sync.RWMutex
 }
 
@@ -49,7 +51,7 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
 }
 
 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
-       locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+       locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
        buf := make([]byte, len(p))
        copy(buf, p)
        if kcs.onPut != nil {
@@ -61,9 +63,12 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        return locator, 1, nil
 }
 
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
 
 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       if strings.Contains(locator, "+A") {
+               return locator, nil
+       }
        kcs.Lock()
        defer kcs.Unlock()
        if strings.Contains(locator, "+R") {
@@ -74,8 +79,9 @@ func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
                        return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
                }
        }
-       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
-       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+       locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+       locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+       return locator, nil
 }
 
 type CollectionFSSuite struct {
@@ -92,7 +98,11 @@ func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
                        "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-               }}
+               },
+               sigkey:    fixtureBlobSigningKey,
+               sigttl:    fixtureBlobSigningTTL,
+               authToken: fixtureActiveToken,
+       }
        s.fs, err = s.coll.FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
 }
index ad44ef22df938c04bca22f54bf25f52715b2625d..cb4ccfcf92b7d2f90c14d7b7856b717b55c19f92 100644 (file)
@@ -26,6 +26,20 @@ type lookupnode struct {
        staleOne  map[string]time.Time
 }
 
+// Sync flushes pending writes for loaded children and, if successful,
+// triggers a reload on next lookup.
+func (ln *lookupnode) Sync() error {
+       err := ln.treenode.Sync()
+       if err != nil {
+               return err
+       }
+       ln.staleLock.Lock()
+       ln.staleAll = time.Time{}
+       ln.staleOne = nil
+       ln.staleLock.Unlock()
+       return nil
+}
+
 func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
        ln.staleLock.Lock()
        defer ln.staleLock.Unlock()
index 5628dcc9c43b42ef6560ddfa1eba2bc0482001d3..dd35323b7736686daf488d8bac2ff0ddd40bd97a 100644 (file)
@@ -199,6 +199,22 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        err = wf.Close()
        c.Check(err, check.IsNil)
 
+       err = project.Sync()
+       c.Check(err, check.IsNil)
+       _, err = s.fs.Open("/home/A Project/oob/test.txt")
+       c.Check(err, check.IsNil)
+
+       // Sync again to mark the project dir as stale, so the
+       // collection gets reloaded from the controller on next
+       // lookup.
+       err = project.Sync()
+       c.Check(err, check.IsNil)
+
+       // Ensure collection was flushed by Sync
+       var latest Collection
+       err = s.client.RequestAndDecode(&latest, "GET", "arvados/v1/collections/"+oob.UUID, nil, nil)
+       c.Check(latest.ManifestText, check.Matches, `.*:test.txt.*\n`)
+
        // Delete test.txt behind s.fs's back by updating the
        // collection record with an empty ManifestText.
        err = s.client.RequestAndDecode(nil, "PATCH", "arvados/v1/collections/"+oob.UUID, nil, map[string]interface{}{
@@ -209,8 +225,6 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        })
        c.Assert(err, check.IsNil)
 
-       err = project.Sync()
-       c.Check(err, check.IsNil)
        _, err = s.fs.Open("/home/A Project/oob/test.txt")
        c.Check(err, check.NotNil)
        _, err = s.fs.Open("/home/A Project/oob")
@@ -220,7 +234,7 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        c.Assert(err, check.IsNil)
 
        err = project.Sync()
-       c.Check(err, check.IsNil)
+       c.Check(err, check.NotNil) // can't update the deleted collection
        _, err = s.fs.Open("/home/A Project/oob")
-       c.Check(err, check.NotNil)
+       c.Check(err, check.IsNil) // parent dir still has old collection -- didn't reload, because Sync failed
 }
index fff0b7e010b22b1811991ce3b6249093c50b616b..96ed74de84cdc45b9c14b0ee3a782c97330d2a78 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "net/http"
        "os"
+       "time"
 
        check "gopkg.in/check.v1"
 )
@@ -22,6 +23,8 @@ const (
        fixtureFooCollectionPDH        = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
        fixtureFooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
        fixtureNonexistentCollection   = "zzzzz-4zz18-totallynotexist"
+       fixtureBlobSigningKey          = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+       fixtureBlobSigningTTL          = 336 * time.Hour
 )
 
 var _ = check.Suite(&SiteFSSuite{})
@@ -41,7 +44,11 @@ func (s *SiteFSSuite) SetUpTest(c *check.C) {
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
                        "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-               }}
+               },
+               sigkey:    fixtureBlobSigningKey,
+               sigttl:    fixtureBlobSigningTTL,
+               authToken: fixtureActiveToken,
+       }
        s.fs = s.client.SiteFileSystem(s.kc)
 }