16535: Add s3 endpoint.
authorTom Clegg <tom@tomclegg.ca>
Mon, 13 Jul 2020 13:55:57 +0000 (09:55 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 13 Jul 2020 13:55:57 +0000 (09:55 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_deferred.go
sdk/go/arvados/fs_site.go
services/keep-web/handler.go
services/keep-web/s3_test.go [new file with mode: 0644]

index 37bd494914df507dc9fc193576dc9e0afcc98ea9..5970ec61208e0a183e9f16e2e53396f8ab5b85de 100644 (file)
@@ -193,6 +193,25 @@ func (fs *collectionFileSystem) Size() int64 {
        return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
+// asChildNode() repackages fs as an inode that can be used as a child
+// node in a different fs. Not goroutine-safe.
+//
+// After calling asChildNode(), the caller should not use fs directly.
+func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
+       root := fs.rootnode().(*dirnode)
+       root.SetParent(parent, name)
+       return &collectionfsnode{dirnode: root, fs: fs}
+}
+
+type collectionfsnode struct {
+       *dirnode
+       fs *collectionFileSystem
+}
+
+func (cn *collectionfsnode) Sync() error {
+       return cn.fs.Sync()
+}
+
 // filenodePtr is an offset into a file that is (usually) efficient to
 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
 // then
index 439eaec7c2a5dbde49f2fd2851551238a22166ec..4eb48b2f77bdde92736c22cb3be1c1789c0ae825 100644 (file)
@@ -37,9 +37,7 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
                        log.Printf("BUG: unhandled error: %s", err)
                        return placeholder
                }
-               root := cfs.rootnode()
-               root.SetParent(parent, coll.Name)
-               return root
+               return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
        }}
 }
 
@@ -87,6 +85,18 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
        return dn.realinode().Child(name, replace)
 }
 
+// Sync is currently unimplemented, except when it's a no-op because
+// the real inode hasn't been created.
+func (dn *deferrednode) Sync() error {
+       dn.mtx.Lock()
+       defer dn.mtx.Unlock()
+       if dn.created {
+               return ErrInvalidArgument
+       } else {
+               return nil
+       }
+}
+
 func (dn *deferrednode) Truncate(size int64) error       { return dn.realinode().Truncate(size) }
 func (dn *deferrednode) SetParent(p inode, name string)  { dn.realinode().SetParent(p, name) }
 func (dn *deferrednode) IsDir() bool                     { return dn.currentinode().IsDir() }
index 7826d335c81fa93cfe54bf39a81a66335a65d336..2e131c33f89d54c7f476f4418417d81f1b3b19b6 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "fmt"
        "os"
        "strings"
        "sync"
@@ -116,8 +117,41 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
 
 func (fs *customFileSystem) Sync() error {
        fs.staleLock.Lock()
-       defer fs.staleLock.Unlock()
        fs.staleThreshold = time.Now()
+       fs.staleLock.Unlock()
+       return fs.syncTree("/", fs.root.inode)
+}
+
+// syncTree calls node.Sync() if it has its own Sync method, otherwise
+// it calls syncTree() on all of node's children.
+//
+// Returns ErrInvalidArgument if node does not implement Sync() and
+// isn't a directory (or if Readdir() fails for any other reason).
+func (fs *customFileSystem) syncTree(path string, node inode) error {
+       if vn, ok := node.(*vdirnode); ok {
+               node = vn.inode
+       }
+       if syncer, ok := node.(interface{ Sync() error }); ok {
+               err := syncer.Sync()
+               if err != nil {
+                       return fmt.Errorf("%s: %T: %w", path, syncer, err)
+               }
+               return nil
+       }
+       fis, err := node.Readdir()
+       if err != nil {
+               return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
+       }
+       for _, fi := range fis {
+               child, err := node.Child(fi.Name(), nil)
+               if err != nil {
+                       continue
+               }
+               err = fs.syncTree(path+"/"+fi.Name(), child)
+               if err != nil {
+                       return err
+               }
+       }
        return nil
 }
 
@@ -153,9 +187,7 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
        if err != nil {
                return nil
        }
-       root := cfs.rootnode()
-       root.SetParent(parent, id)
-       return root
+       return cfs.(*collectionFileSystem).asChildNode(parent, id)
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
index 643ca4f587f51bc9b353ab29b4a82869d96578a8..b8071b914bcd4fe835359e5fbc73b5328fce57e2 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "encoding/json"
+       "fmt"
        "html"
        "html/template"
        "io"
@@ -227,6 +228,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
        }
 
+       if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+               split := strings.SplitN(auth[4:], ":", 2)
+               if len(split) < 2 {
+                       w.WriteHeader(http.StatusUnauthorized)
+                       return
+               }
+               h.serveS3(w, r, split[0])
+               return
+       } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+               w.WriteHeader(http.StatusBadRequest)
+               fmt.Println(w, "V4 signature is not supported")
+               return
+       }
+
        pathParts := strings.Split(r.URL.Path[1:], "/")
 
        var stripParts int
@@ -509,6 +524,103 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
 }
 
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+       arv = h.clientPool.Get()
+       if arv == nil {
+               return nil, nil, nil, nil, err
+       }
+       release = func() { h.clientPool.Put(arv) }
+       arv.ApiToken = token
+       kc, err = keepclient.MakeKeepClient(arv)
+       if err != nil {
+               release()
+               return
+       }
+       kc.RequestID = reqID
+       client = (&arvados.Client{
+               APIHost:   arv.ApiServer,
+               AuthToken: arv.ApiToken,
+               Insecure:  arv.ApiInsecure,
+       }).WithRequestID(reqID)
+       return
+}
+
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
+       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+       if err != nil {
+               http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+               return
+       }
+       defer release()
+
+       r.URL.Path = "/by_id" + r.URL.Path
+
+       fs := client.SiteFileSystem(kc)
+       fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+       switch r.Method {
+       case "GET":
+               fi, err := fs.Stat(r.URL.Path)
+               if os.IsNotExist(err) {
+                       http.Error(w, err.Error(), http.StatusNotFound)
+                       return
+               } else if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return
+               } else if fi.IsDir() {
+                       http.Error(w, "not found", http.StatusNotFound)
+               }
+               http.FileServer(fs).ServeHTTP(w, r)
+               return
+       case "PUT":
+               f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+               if os.IsNotExist(err) {
+                       // create missing intermediate directories, then try again
+                       for i, c := range r.URL.Path {
+                               if i > 0 && c == '/' {
+                                       dir := r.URL.Path[:i]
+                                       err := fs.Mkdir(dir, 0755)
+                                       if err != nil && err != os.ErrExist {
+                                               err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+                                               http.Error(w, err.Error(), http.StatusInternalServerError)
+                                               return
+                                       }
+                               }
+                       }
+                       f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+               }
+               if err != nil {
+                       err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+                       http.Error(w, err.Error(), http.StatusBadRequest)
+                       return
+               }
+               defer f.Close()
+               _, err = io.Copy(f, r.Body)
+               if err != nil {
+                       err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+                       http.Error(w, err.Error(), http.StatusBadGateway)
+                       return
+               }
+               err = f.Close()
+               if err != nil {
+                       err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+                       http.Error(w, err.Error(), http.StatusBadGateway)
+                       return
+               }
+               err = fs.Sync()
+               if err != nil {
+                       err = fmt.Errorf("sync failed: %w", err)
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return
+               }
+               w.WriteHeader(http.StatusOK)
+               return
+       default:
+               http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+}
+
 func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
        if len(tokens) == 0 {
                w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
@@ -519,25 +631,13 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-       arv := h.clientPool.Get()
-       if arv == nil {
+       _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+       if err != nil {
                http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
                return
        }
-       defer h.clientPool.Put(arv)
-       arv.ApiToken = tokens[0]
+       defer release()
 
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-       kc.RequestID = r.Header.Get("X-Request-Id")
-       client := (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
        fs := client.SiteFileSystem(kc)
        fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
        f, err := fs.Open(r.URL.Path)
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
new file mode 100644 (file)
index 0000000..600b834
--- /dev/null
@@ -0,0 +1,173 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "crypto/rand"
+       "io/ioutil"
+       "os"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/AdRoll/goamz/aws"
+       "github.com/AdRoll/goamz/s3"
+       check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+       var coll arvados.Collection
+       arv := arvados.NewClientFromEnv()
+       arv.AuthToken = arvadostest.ActiveToken
+       err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+               "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+       }})
+       c.Assert(err, check.IsNil)
+       ac, err := arvadosclient.New(arv)
+       c.Assert(err, check.IsNil)
+       kc, err := keepclient.MakeKeepClient(ac)
+       c.Assert(err, check.IsNil)
+       fs, err := coll.FileSystem(arv, kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+       c.Assert(err, check.IsNil)
+       _, err = f.Write([]byte("⛵\n"))
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       err = fs.Sync()
+       c.Assert(err, check.IsNil)
+
+       auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+       region := aws.Region{
+               Name:       s.testServer.Addr,
+               S3Endpoint: "http://" + s.testServer.Addr,
+       }
+       client := s3.New(*auth, region)
+       bucket := &s3.Bucket{
+               S3:   client,
+               Name: coll.UUID,
+       }
+       return arv, coll, bucket
+}
+
+func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
+       err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
+       arv, coll, bucket := s.s3setup(c)
+       defer s.s3teardown(c, arv, coll)
+
+       rdr, err := bucket.GetReader("emptyfile")
+       c.Assert(err, check.IsNil)
+       buf, err := ioutil.ReadAll(rdr)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf), check.Equals, 0)
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
+
+       rdr, err = bucket.GetReader("missingfile")
+       c.Check(err, check.NotNil)
+
+       rdr, err = bucket.GetReader("sailboat.txt")
+       c.Check(err, check.IsNil)
+       buf, err = ioutil.ReadAll(rdr)
+       c.Check(err, check.IsNil)
+       c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
+       arv, coll, bucket := s.s3setup(c)
+       defer s.s3teardown(c, arv, coll)
+
+       for _, trial := range []struct {
+               objname string
+               size    int
+       }{
+               {
+                       objname: "newfile",
+                       size:    128000000,
+               }, {
+                       objname: "newdir/newfile",
+                       size:    1 << 26,
+               }, {
+                       objname: "newdir1/newdir2/newfile",
+                       size:    0,
+               },
+       } {
+               c.Logf("=== %v", trial)
+
+               _, err := bucket.GetReader(trial.objname)
+               c.Assert(err, check.NotNil)
+
+               buf := make([]byte, trial.size)
+               rand.Read(buf)
+
+               err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+               c.Check(err, check.IsNil)
+
+               rdr, err := bucket.GetReader(trial.objname)
+               if !c.Check(err, check.IsNil) {
+                       continue
+               }
+               buf2, err := ioutil.ReadAll(rdr)
+               c.Check(err, check.IsNil)
+               c.Check(buf2, check.HasLen, len(buf))
+               c.Check(buf2, check.DeepEquals, buf)
+       }
+}
+
+func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
+       arv, coll, bucket := s.s3setup(c)
+       defer s.s3teardown(c, arv, coll)
+
+       for _, trial := range []struct {
+               objname string
+       }{
+               {
+                       objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+               }, {
+                       objname: "emptyfile/", // emptyfile exists, see s3setup()
+               }, {
+                       objname: "emptydir", // dir already exists, see s3setup()
+               }, {
+                       objname: "emptydir/",
+               }, {
+                       objname: "emptydir//",
+               }, {
+                       objname: "newdir/",
+               }, {
+                       objname: "newdir//",
+               }, {
+                       objname: "/",
+               }, {
+                       objname: "//",
+               }, {
+                       objname: "foo//bar",
+               }, {
+                       objname: "",
+               },
+       } {
+               c.Logf("=== %v", trial)
+
+               buf := make([]byte, 1234)
+               rand.Read(buf)
+
+               err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+               if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+                       continue
+               }
+
+               _, err = bucket.GetReader(trial.objname)
+               c.Check(err, check.NotNil)
+       }
+}