16535: Add S3 DeleteObject API.
authorTom Clegg <tom@tomclegg.ca>
Thu, 20 Aug 2020 20:24:19 +0000 (16:24 -0400)
committerTom Clegg <tom@tomclegg.ca>
Thu, 20 Aug 2020 20:24:19 +0000 (16:24 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

services/keep-web/s3.go
services/keep-web/s3_test.go

index 12e294d9339710f861a491518c5dcb4d541f2f10..01bc8b704788da25b835f9b11ef43db0f079fc26 100644 (file)
@@ -205,6 +205,54 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                }
                w.WriteHeader(http.StatusOK)
                return true
+       case r.Method == http.MethodDelete:
+               if !objectNameGiven || r.URL.Path == "/" {
+                       http.Error(w, "missing object name in DELETE request", http.StatusBadRequest)
+                       return true
+               }
+               fspath := "by_id" + r.URL.Path
+               if strings.HasSuffix(fspath, "/") {
+                       fspath = strings.TrimSuffix(fspath, "/")
+                       fi, err := fs.Stat(fspath)
+                       if os.IsNotExist(err) {
+                               w.WriteHeader(http.StatusNoContent)
+                               return true
+                       } else if err != nil {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
+                               return true
+                       } else if !fi.IsDir() {
+                               // if "foo" exists and is a file, then
+                               // "foo/" doesn't exist, so we say
+                               // delete was successful.
+                               w.WriteHeader(http.StatusNoContent)
+                               return true
+                       }
+               } else if fi, err := fs.Stat(fspath); err == nil && fi.IsDir() {
+                       // if "foo" is a dir, it is visible via S3
+                       // only as "foo/", not "foo" -- so we leave
+                       // the dir alone and return 204 to indicate
+                       // that "foo" does not exist.
+                       w.WriteHeader(http.StatusNoContent)
+                       return true
+               }
+               err = fs.Remove(fspath)
+               if os.IsNotExist(err) {
+                       w.WriteHeader(http.StatusNoContent)
+                       return true
+               }
+               if err != nil {
+                       err = fmt.Errorf("rm failed: %w", err)
+                       http.Error(w, err.Error(), http.StatusBadRequest)
+                       return true
+               }
+               err = fs.Sync()
+               if err != nil {
+                       err = fmt.Errorf("sync failed: %w", err)
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return true
+               }
+               w.WriteHeader(http.StatusNoContent)
+               return true
        default:
                http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
                return true
index 73553ff4d31b4ff7538eaecbff56c94711ad57bb..85921f84d91ae37eb29e3dfd14c508f5a8e4342d 100644 (file)
@@ -260,6 +260,43 @@ func (s *IntegrationSuite) TestS3ProjectPutObjectNotSupported(c *check.C) {
        }
 }
 
+func (s *IntegrationSuite) TestS3CollectionDeleteObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3DeleteObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectDeleteObject(c *check.C) {
+       stage := s.s3setup(c)
+       defer stage.teardown(c)
+       s.testS3DeleteObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3DeleteObject(c *check.C, bucket *s3.Bucket, prefix string) {
+       s.testServer.Config.cluster.Collections.S3FolderObjects = true
+       for _, trial := range []struct {
+               path string
+       }{
+               {"/"},
+               {"nonexistentfile"},
+               {"emptyfile"},
+               {"sailboat.txt"},
+               {"sailboat.txt/"},
+               {"emptydir"},
+               {"emptydir/"},
+       } {
+               objname := prefix + trial.path
+               comment := check.Commentf("objname %q", objname)
+
+               err := bucket.Del(objname)
+               if trial.path == "/" {
+                       c.Check(err, check.NotNil)
+                       continue
+               }
+               c.Check(err, check.IsNil, comment)
+               _, err = bucket.GetReader(objname)
+               c.Check(err, check.NotNil, comment)
+       }
+}
+
 func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)