return fs.fileSystem.root.(*dirnode).TreeSize()
}
+// asChildNode() repackages fs as an inode that can be used as a child
+// node in a different fs. Not goroutine-safe.
+//
+// After calling asChildNode(), the caller should not use fs directly.
+func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
+ root := fs.rootnode().(*dirnode)
+ root.SetParent(parent, name)
+ return &collectionfsnode{dirnode: root, fs: fs}
+}
+
+type collectionfsnode struct {
+ *dirnode
+ fs *collectionFileSystem
+}
+
+func (cn *collectionfsnode) Sync() error {
+ return cn.fs.Sync()
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- root := cfs.rootnode()
- root.SetParent(parent, coll.Name)
- return root
+ return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
}}
}
return dn.realinode().Child(name, replace)
}
+// Sync is currently unimplemented, except when it's a no-op because
+// the real inode hasn't been created.
+func (dn *deferrednode) Sync() error {
+ dn.mtx.Lock()
+ defer dn.mtx.Unlock()
+ if dn.created {
+ return ErrInvalidArgument
+ } else {
+ return nil
+ }
+}
+
func (dn *deferrednode) Truncate(size int64) error { return dn.realinode().Truncate(size) }
func (dn *deferrednode) SetParent(p inode, name string) { dn.realinode().SetParent(p, name) }
func (dn *deferrednode) IsDir() bool { return dn.currentinode().IsDir() }
package arvados
import (
+ "fmt"
"os"
"strings"
"sync"
func (fs *customFileSystem) Sync() error {
fs.staleLock.Lock()
- defer fs.staleLock.Unlock()
fs.staleThreshold = time.Now()
+ fs.staleLock.Unlock()
+ return fs.syncTree("/", fs.root.inode)
+}
+
+// syncTree calls node.Sync() if it has its own Sync method, otherwise
+// it calls syncTree() on all of node's children.
+//
+// Returns ErrInvalidArgument if node does not implement Sync() and
+// isn't a directory (or if Readdir() fails for any other reason).
+func (fs *customFileSystem) syncTree(path string, node inode) error {
+ if vn, ok := node.(*vdirnode); ok {
+ node = vn.inode
+ }
+ if syncer, ok := node.(interface{ Sync() error }); ok {
+ err := syncer.Sync()
+ if err != nil {
+ return fmt.Errorf("%s: %T: %w", path, syncer, err)
+ }
+ return nil
+ }
+ fis, err := node.Readdir()
+ if err != nil {
+ return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
+ }
+ for _, fi := range fis {
+ child, err := node.Child(fi.Name(), nil)
+ if err != nil {
+ continue
+ }
+ err = fs.syncTree(path+"/"+fi.Name(), child)
+ if err != nil {
+ return err
+ }
+ }
return nil
}
if err != nil {
return nil
}
- root := cfs.rootnode()
- root.SetParent(parent, id)
- return root
+ return cfs.(*collectionFileSystem).asChildNode(parent, id)
}
func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
import (
"encoding/json"
+ "fmt"
"html"
"html/template"
"io"
w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
+ if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+ split := strings.SplitN(auth[4:], ":", 2)
+ if len(split) < 2 {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ h.serveS3(w, r, split[0])
+ return
+ } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Println(w, "V4 signature is not supported")
+ return
+ }
+
pathParts := strings.Split(r.URL.Path[1:], "/")
var stripParts int
}
}
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+ arv = h.clientPool.Get()
+ if arv == nil {
+ return nil, nil, nil, nil, err
+ }
+ release = func() { h.clientPool.Put(arv) }
+ arv.ApiToken = token
+ kc, err = keepclient.MakeKeepClient(arv)
+ if err != nil {
+ release()
+ return
+ }
+ kc.RequestID = reqID
+ client = (&arvados.Client{
+ APIHost: arv.ApiServer,
+ AuthToken: arv.ApiToken,
+ Insecure: arv.ApiInsecure,
+ }).WithRequestID(reqID)
+ return
+}
+
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ return
+ }
+ defer release()
+
+ r.URL.Path = "/by_id" + r.URL.Path
+
+ fs := client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+ switch r.Method {
+ case "GET":
+ fi, err := fs.Stat(r.URL.Path)
+ if os.IsNotExist(err) {
+ http.Error(w, err.Error(), http.StatusNotFound)
+ return
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ } else if fi.IsDir() {
+ http.Error(w, "not found", http.StatusNotFound)
+ }
+ http.FileServer(fs).ServeHTTP(w, r)
+ return
+ case "PUT":
+ f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ if os.IsNotExist(err) {
+ // create missing intermediate directories, then try again
+ for i, c := range r.URL.Path {
+ if i > 0 && c == '/' {
+ dir := r.URL.Path[:i]
+ err := fs.Mkdir(dir, 0755)
+ if err != nil && err != os.ErrExist {
+ err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ }
+ }
+ f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ }
+ if err != nil {
+ err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ defer f.Close()
+ _, err = io.Copy(f, r.Body)
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ err = fs.Sync()
+ if err != nil {
+ err = fmt.Errorf("sync failed: %w", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ return
+ default:
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+}
+
func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
if len(tokens) == 0 {
w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
- arv := h.clientPool.Get()
- if arv == nil {
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+ if err != nil {
http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
return
}
- defer h.clientPool.Put(arv)
- arv.ApiToken = tokens[0]
+ defer release()
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
- return
- }
- kc.RequestID = r.Header.Get("X-Request-Id")
- client := (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(r.Header.Get("X-Request-Id"))
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "crypto/rand"
+ "io/ioutil"
+ "os"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/AdRoll/goamz/aws"
+ "github.com/AdRoll/goamz/s3"
+ check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+ var coll arvados.Collection
+ arv := arvados.NewClientFromEnv()
+ arv.AuthToken = arvadostest.ActiveToken
+ err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+ }})
+ c.Assert(err, check.IsNil)
+ ac, err := arvadosclient.New(arv)
+ c.Assert(err, check.IsNil)
+ kc, err := keepclient.MakeKeepClient(ac)
+ c.Assert(err, check.IsNil)
+ fs, err := coll.FileSystem(arv, kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("⛵\n"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ err = fs.Sync()
+ c.Assert(err, check.IsNil)
+
+ auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+ region := aws.Region{
+ Name: s.testServer.Addr,
+ S3Endpoint: "http://" + s.testServer.Addr,
+ }
+ client := s3.New(*auth, region)
+ bucket := &s3.Bucket{
+ S3: client,
+ Name: coll.UUID,
+ }
+ return arv, coll, bucket
+}
+
+func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
+ err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ rdr, err := bucket.GetReader("emptyfile")
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf), check.Equals, 0)
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+
+ rdr, err = bucket.GetReader("missingfile")
+ c.Check(err, check.NotNil)
+
+ rdr, err = bucket.GetReader("sailboat.txt")
+ c.Check(err, check.IsNil)
+ buf, err = ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ for _, trial := range []struct {
+ objname string
+ size int
+ }{
+ {
+ objname: "newfile",
+ size: 128000000,
+ }, {
+ objname: "newdir/newfile",
+ size: 1 << 26,
+ }, {
+ objname: "newdir1/newdir2/newfile",
+ size: 0,
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ _, err := bucket.GetReader(trial.objname)
+ c.Assert(err, check.NotNil)
+
+ buf := make([]byte, trial.size)
+ rand.Read(buf)
+
+ err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ c.Check(err, check.IsNil)
+
+ rdr, err := bucket.GetReader(trial.objname)
+ if !c.Check(err, check.IsNil) {
+ continue
+ }
+ buf2, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf2, check.HasLen, len(buf))
+ c.Check(buf2, check.DeepEquals, buf)
+ }
+}
+
+func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ for _, trial := range []struct {
+ objname string
+ }{
+ {
+ objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+ }, {
+ objname: "emptyfile/", // emptyfile exists, see s3setup()
+ }, {
+ objname: "emptydir", // dir already exists, see s3setup()
+ }, {
+ objname: "emptydir/",
+ }, {
+ objname: "emptydir//",
+ }, {
+ objname: "newdir/",
+ }, {
+ objname: "newdir//",
+ }, {
+ objname: "/",
+ }, {
+ objname: "//",
+ }, {
+ objname: "foo//bar",
+ }, {
+ objname: "",
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ buf := make([]byte, 1234)
+ rand.Read(buf)
+
+ err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+ continue
+ }
+
+ _, err = bucket.GetReader(trial.objname)
+ c.Check(err, check.NotNil)
+ }
+}