16727: Refresh block permission signatures on Sync and Read.
authorTom Clegg <tom@curii.com>
Tue, 25 Jan 2022 16:15:51 +0000 (11:15 -0500)
committerTom Clegg <tom@curii.com>
Thu, 27 Jan 2022 15:36:33 +0000 (10:36 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/blob_signature.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_project_test.go

index 47b31a18e893d0a848f39d0b9a16e5e736c84c71..9a031face2e957d91eed93982817fa8e1dc67d7f 100644 (file)
@@ -16,6 +16,7 @@ import (
        "fmt"
        "regexp"
        "strconv"
+       "strings"
        "time"
 )
 
@@ -126,3 +127,21 @@ func parseHexTimestamp(timestampHex string) (ts time.Time, err error) {
        }
        return ts, err
 }
+
+var errNoSignature = errors.New("locator has no signature")
+
+func signatureExpiryTime(signedLocator string) (time.Time, error) {
+       matches := SignedLocatorRe.FindStringSubmatch(signedLocator)
+       if matches == nil {
+               return time.Time{}, errNoSignature
+       }
+       expiryHex := matches[7]
+       return parseHexTimestamp(expiryHex)
+}
+
+func stripAllHints(locator string) string {
+       if i := strings.IndexRune(locator, '+'); i > 0 {
+               return locator[:i]
+       }
+       return locator
+}
index 2b5df76ad6a12d7e8e557efad006f3aa25f128d5..d39805f3f3a773271825c68737645a0dcc7887fc 100644 (file)
@@ -43,9 +43,13 @@ type CollectionFileSystem interface {
 
 type collectionFileSystem struct {
        fileSystem
-       uuid           string
-       replicas       int
-       storageClasses []string
+       uuid              string
+       savedPDH          atomic.Value
+       replicas          int
+       storageClasses    []string
+       guessSignatureTTL time.Duration
+       holdCheckChanges  time.Time
+       lockCheckChanges  sync.Mutex
 }
 
 // FileSystem returns a CollectionFileSystem for the collection.
@@ -62,6 +66,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                        thr:       newThrottle(concurrentWriters),
                },
        }
+       fs.savedPDH.Store(c.PortableDataHash)
        if r := c.ReplicationDesired; r != nil {
                fs.replicas = *r
        }
@@ -85,18 +90,109 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
        return fs, nil
 }
 
-func backdateTree(n inode, modTime time.Time) {
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
        switch n := n.(type) {
        case *filenode:
-               n.fileinfo.modTime = modTime
+               if ffunc != nil {
+                       ffunc(n)
+               }
        case *dirnode:
-               n.fileinfo.modTime = modTime
+               if dfunc != nil {
+                       dfunc(n)
+               }
                for _, n := range n.inodes {
-                       backdateTree(n, modTime)
+                       eachNode(n, ffunc, dfunc)
                }
        }
 }
 
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func backdateTree(n inode, modTime time.Time) {
+       eachNode(n, func(fn *filenode) {
+               fn.fileinfo.modTime = modTime
+       }, func(dn *dirnode) {
+               dn.fileinfo.modTime = modTime
+       })
+}
+
+// Approximate portion of signature TTL remaining, usually between 0
+// and 1, or negative if some signatures have expired.
+func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
+       var (
+               now      = time.Now()
+               earliest = now.Add(time.Hour * 24 * 7 * 365)
+               latest   time.Time
+       )
+       fs.fileSystem.root.RLock()
+       eachNode(fs.root, func(fn *filenode) {
+               fn.Lock()
+               defer fn.Unlock()
+               for _, seg := range fn.segments {
+                       seg, ok := seg.(storedSegment)
+                       if !ok {
+                               continue
+                       }
+                       expiryTime, err := signatureExpiryTime(seg.locator)
+                       if err != nil {
+                               continue
+                       }
+                       if expiryTime.Before(earliest) {
+                               earliest = expiryTime
+                       }
+                       if expiryTime.After(latest) {
+                               latest = expiryTime
+                       }
+               }
+       }, nil)
+       fs.fileSystem.root.RUnlock()
+
+       if latest.IsZero() {
+               // No signatures == 100% of TTL remaining.
+               return 1, 1
+       }
+
+       ttl := latest.Sub(now)
+       fs.fileSystem.root.Lock()
+       {
+               if ttl > fs.guessSignatureTTL {
+                       fs.guessSignatureTTL = ttl
+               } else {
+                       ttl = fs.guessSignatureTTL
+               }
+       }
+       fs.fileSystem.root.Unlock()
+
+       return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
+}
+
+func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
+       newLoc := map[string]string{}
+       for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
+               if mBlkRe.MatchString(tok) {
+                       newLoc[stripAllHints(tok)] = tok
+               }
+       }
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       eachNode(fs.root, func(fn *filenode) {
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, seg := range fn.segments {
+                       seg, ok := seg.(storedSegment)
+                       if !ok {
+                               continue
+                       }
+                       loc, ok := newLoc[stripAllHints(seg.locator)]
+                       if !ok {
+                               continue
+                       }
+                       seg.locator = loc
+                       fn.segments[idx] = seg
+               }
+       }, nil)
+}
+
 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
        if name == "" || name == "." || name == ".." {
                return nil, ErrInvalidArgument
@@ -180,7 +276,88 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
+// Check for and incorporate upstream changes -- unless that has
+// already been done recently, in which case this func is a no-op.
+func (fs *collectionFileSystem) checkChangesOnServer() error {
+       if fs.uuid == "" && fs.savedPDH.Load() == "" {
+               return nil
+       }
+
+       // First try UUID if any, then last known PDH. Stop if all
+       // signatures are new enough.
+       checkingAll := false
+       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+               if id == "" {
+                       continue
+               }
+
+               fs.lockCheckChanges.Lock()
+               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
+                       fs.lockCheckChanges.Unlock()
+                       return nil
+               }
+               remain, ttl := fs.signatureTimeLeft()
+               if remain > 0.01 && !checkingAll {
+                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
+               }
+               fs.lockCheckChanges.Unlock()
+
+               if remain >= 0.5 {
+                       break
+               }
+               checkingAll = true
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               if err != nil {
+                       continue
+               }
+               fs.updateSignatures(coll.ManifestText)
+       }
+       return nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+       exp, err := signatureExpiryTime(locator)
+       if err != nil || exp.Sub(time.Now()) > time.Minute {
+               // Synchronous update is not needed. Start an
+               // asynchronous update if needed.
+               go fs.checkChangesOnServer()
+               return locator
+       }
+       var manifests string
+       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+               if id == "" {
+                       continue
+               }
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               if err != nil {
+                       continue
+               }
+               manifests += coll.ManifestText
+       }
+       hash := stripAllHints(locator)
+       for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+               if mBlkRe.MatchString(tok) {
+                       if stripAllHints(tok) == hash {
+                               locator = tok
+                               break
+                       }
+               }
+       }
+       go fs.updateSignatures(manifests)
+       return locator
+}
+
 func (fs *collectionFileSystem) Sync() error {
+       err := fs.checkChangesOnServer()
+       if err != nil {
+               return err
+       }
        if fs.uuid == "" {
                return nil
        }
@@ -188,19 +365,34 @@ func (fs *collectionFileSystem) Sync() error {
        if err != nil {
                return fmt.Errorf("sync failed: %s", err)
        }
-       coll := &Collection{
+       if PortableDataHash(txt) == fs.savedPDH.Load() {
+               // No local changes since last save or initial load.
+               return nil
+       }
+       coll := Collection{
                UUID:         fs.uuid,
                ManifestText: txt,
        }
-       err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+
+       selectFields := []string{"uuid", "portable_data_hash"}
+       fs.lockCheckChanges.Lock()
+       remain, _ := fs.signatureTimeLeft()
+       fs.lockCheckChanges.Unlock()
+       if remain < 0.5 {
+               selectFields = append(selectFields, "manifest_text")
+       }
+
+       err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
                "collection": map[string]string{
                        "manifest_text": coll.ManifestText,
                },
-               "select": []string{"uuid"},
+               "select": selectFields,
        })
        if err != nil {
                return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
        }
+       fs.updateSignatures(coll.ManifestText)
+       fs.savedPDH.Store(coll.PortableDataHash)
        return nil
 }
 
@@ -375,6 +567,10 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
                err = io.EOF
                return
        }
+       if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
+               ss.locator = fn.fs.refreshSignature(ss.locator)
+               fn.segments[ptr.segmentIdx] = ss
+       }
        n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
        if n > 0 {
                ptr.off += int64(n)
index beb4d61fcf72ef7696952b3bf37179334ff3abd7..fab91d1f77dc37f3708380db3796400ff01dfca5 100644 (file)
@@ -32,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
+       reads       []string             // locators from ReadAt() calls
        onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
        authToken   string               // client's auth token (used for signing locators)
        sigkey      string               // blob signing key
@@ -42,8 +43,14 @@ type keepClientStub struct {
 var errStub404 = errors.New("404 block not found")
 
 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+       kcs.Lock()
+       kcs.reads = append(kcs.reads, locator)
+       kcs.Unlock()
        kcs.RLock()
        defer kcs.RUnlock()
+       if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
+               return 0, err
+       }
        buf := kcs.blocks[locator[:32]]
        if buf == nil {
                return 0, errStub404
@@ -102,6 +109,7 @@ type CollectionFSSuite struct {
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.client = NewClientFromEnv()
+       s.client.AuthToken = fixtureActiveToken
        err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
        c.Assert(err, check.IsNil)
        s.kc = &keepClientStub{
@@ -1433,6 +1441,103 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        }
 }
 
+func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
+       filedata1 := "hello refresh signatures world\n"
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       fs.Mkdir("d1", 0700)
+       f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte(filedata1))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+
+       filedata2 := "hello refresh signatures universe\n"
+       fs.Mkdir("d2", 0700)
+       f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte(filedata2))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       txt, err := fs.MarshalManifest(".")
+       c.Assert(err, check.IsNil)
+       var saved Collection
+       err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "select": []string{"manifest_text", "uuid", "portable_data_hash"},
+               "collection": map[string]interface{}{
+                       "manifest_text": txt,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
+       // Update signatures synchronously if they are already expired
+       // when Read() is called.
+       {
+               saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
+               fs, err := saved.FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(string(buf), check.Equals, filedata1)
+       }
+
+       // Update signatures asynchronously if we're more than half
+       // way to TTL when Read() is called.
+       {
+               exp := time.Now().Add(2 * time.Minute)
+               saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
+               fs, err := saved.FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               buf, err := ioutil.ReadAll(f1)
+               c.Check(err, check.IsNil)
+               c.Check(string(buf), check.Equals, filedata1)
+
+               // Ensure fs treats the 2-minute TTL as less than half
+               // the server's signing TTL. If we don't do this,
+               // collectionfs will guess the signature is fresh,
+               // i.e., signing TTL is 2 minutes, and won't do an
+               // async refresh.
+               fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
+
+               refreshed := false
+               for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
+                       _, err = f1.Seek(0, io.SeekStart)
+                       c.Assert(err, check.IsNil)
+                       buf, err = ioutil.ReadAll(f1)
+                       c.Assert(err, check.IsNil)
+                       c.Assert(string(buf), check.Equals, filedata1)
+                       loc := s.kc.reads[len(s.kc.reads)-1]
+                       t, err := signatureExpiryTime(loc)
+                       c.Assert(err, check.IsNil)
+                       c.Logf("last read block %s had signature expiry time %v", loc, t)
+                       if t.Sub(time.Now()) > time.Hour {
+                               refreshed = true
+                       }
+               }
+               c.Check(refreshed, check.Equals, true)
+
+               // Second locator should have been updated at the same
+               // time.
+               buf, err = ioutil.ReadAll(f2)
+               c.Assert(err, check.IsNil)
+               c.Assert(string(buf), check.Equals, filedata2)
+               loc := s.kc.reads[len(s.kc.reads)-1]
+               c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
+               t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
+               c.Assert(err, check.IsNil)
+               c.Logf("last read block %s had signature expiry time %v", loc, t)
+               c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
+       }
+}
+
 var bigmanifest = func() string {
        var buf bytes.Buffer
        for i := 0; i < 2000; i++ {
index f68e7c8b08e97b4dd2fdd9bd9252ecd563060b59..89435132713b0a14a8438d1ebe07390beb4a8893 100644 (file)
@@ -295,6 +295,11 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        err = s.client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+oob.UUID, nil, nil)
        c.Assert(err, check.IsNil)
 
+       wf, err = s.fs.OpenFile("/home/A Project/oob/test.txt", os.O_CREATE|os.O_RDWR, 0700)
+       c.Assert(err, check.IsNil)
+       err = wf.Close()
+       c.Check(err, check.IsNil)
+
        err = project.Sync()
        c.Check(err, check.NotNil) // can't update the deleted collection
        _, err = s.fs.Open("/home/A Project/oob")