Merge branch 'master' into 10477-upgrade-aws-s3-driver
authorWard Vandewege <ward@curii.com>
Tue, 4 Aug 2020 19:45:07 +0000 (15:45 -0400)
committerWard Vandewege <ward@curii.com>
Tue, 4 Aug 2020 19:45:29 +0000 (15:45 -0400)
refs #10477

Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

doc/install/configure-s3-object-storage.html.textile.liquid
go.mod
go.sum
lib/config/config.default.yml
lib/config/generated_config.go
sdk/go/arvados/config.go
services/keepstore/s3_volume.go
services/keepstore/s3aws_volume.go [new file with mode: 0644]
services/keepstore/s3aws_volume_test.go [new file with mode: 0644]

index b960ac1fda0c2ab1fbaae77e4ae3c875b8dec0bc..76a2f3ab5723121cb2d0ae9d7e4724c5b2c14d06 100644 (file)
@@ -64,6 +64,9 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
           # might be needed for other S3-compatible services.
           V2Signature: false
 
+          # Use the AWS S3 v2 Go driver instead of the goamz driver.
+          UseAWSS3v2Driver: false
+
           # Requested page size for "list bucket contents" requests.
           IndexPageSize: 1000
 
@@ -94,3 +97,9 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
         # classes" in the "Admin" section of doc.arvados.org.
         StorageClasses: null
 </code></pre></notextile>
+
+Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @UseAWSS3v2Driver@ flag to @true@.
+
+The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
+
+The @aws-sdk-go-v2@ driver can improve read performance by 50-100% over the @goamz@ driver, but it has not had as much production use. See the "wiki":https://dev.arvados.org/projects/arvados/wiki/Keep_real_world_performance_numbers for details.
diff --git a/go.mod b/go.mod
index 884d1fcdac8637e77fb349aa569ad8fcbb5b7924..71052882adbeff703ae81a21900561afe15c8743 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
        github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
        github.com/arvados/cgofuse v1.2.0-arvados1
        github.com/aws/aws-sdk-go v1.25.30
+       github.com/aws/aws-sdk-go-v2 v0.23.0
        github.com/bgentry/speakeasy v0.1.0 // indirect
        github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
        github.com/coreos/go-oidc v2.1.0+incompatible
@@ -37,6 +38,7 @@ require (
        github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
        github.com/jmcvetta/randutil v0.0.0-20150817122601-2bb1b664bcff
        github.com/jmoiron/sqlx v1.2.0
+       github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc
        github.com/julienschmidt/httprouter v1.2.0
        github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7 // indirect
        github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
@@ -58,7 +60,7 @@ require (
        github.com/stretchr/testify v1.4.0 // indirect
        github.com/xanzy/ssh-agent v0.1.0 // indirect
        golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
-       golang.org/x/net v0.0.0-20190620200207-3b0461eec859
+       golang.org/x/net v0.0.0-20200202094626-16171245cfb2
        golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
        golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
        google.golang.org/api v0.13.0
diff --git a/go.sum b/go.sum
index ead655c9b276164c2a7e9766ea57d7b96ef3f82a..2565964e7d45121d76e59e0c7b5e21743beaaa28 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -21,14 +21,18 @@ github.com/arvados/cgofuse v1.2.0-arvados1 h1:4Q4vRJ4hbTCcI4gGEaa6hqwj3rqlUuzeFQ
 github.com/arvados/cgofuse v1.2.0-arvados1/go.mod h1:79WFV98hrkRHK9XPhh2IGGOwpFSjocsWubgxAs2KhRc=
 github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef h1:cl7DIRbiAYNqaVxg3CZY8qfZoBOKrj06H/x9SPGaxas=
 github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef/go.mod h1:rCtgyMmBGEbjTm37fCuBYbNL0IhztiALzo3OB9HyiOM=
+github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
 github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
+github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
 github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092 h1:0Di2onNnlN5PAyWPbqlPyN45eOQ+QW/J9eqLynt4IV4=
 github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092/go.mod h1:8IzBjZCRSnsvM6MJMG8HNNtnzMl48H22rbJL2kRUJ0Y=
 github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
@@ -74,7 +78,7 @@ github.com/go-ldap/ldap v3.0.3+incompatible h1:HTeSZO8hWMS1Rgb2Ziku6b8a7qRIZZMHj
 github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
-github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
+github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -91,6 +95,7 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -114,6 +119,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
 github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc/go.mod h1:fNiSoOiEI5KlkWXn26OwKnNe58ilTIkpBlgOrt7Olu8=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
@@ -171,13 +178,17 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
 github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
 github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5 h1:Jw7W4WMfQDxsXvfeFSaS2cHlY7bAF4MGrgnbd0+Uo78=
 github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
 github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
+github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63/go.mod h1:n+VKSARF5y/tS9XFSP7vWDfS+GUC5vs/YT7M5XDTUEM=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
 github.com/src-d/gcfg v1.3.0 h1:2BEDr8r0I0b8h/fOqwtxCEiq2HJu8n2JGZJQFGXWLjg=
 github.com/src-d/gcfg v1.3.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -206,6 +217,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190310074541-c10a0554eabf/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
@@ -214,6 +226,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowK
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
@@ -227,6 +240,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190310054646-10058d7d4faa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -242,10 +256,12 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190308174544-00c44ba9c14f/go.mod h1:25r3+/G6/xytQM8iWZKq3Hn0kr0rgFKPUNVEL/dr3z4=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c h1:97SnQk1GYRXJgvwZ8fadnxDOWfKvkNQHH3CtZntPSrM=
 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
 google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
@@ -267,6 +283,7 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
 gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
 gopkg.in/src-d/go-billy.v4 v4.0.1 h1:iMxwQPj2cuKRyaIZ985zxClkcdTtT5VpXYf4PTJc0Ek=
index 96291a2d4b9bdd2a7ce3341feb9c0e8254cc4879..a2a34448f11cced6b01a6343eb56acc314e5556c 100644 (file)
@@ -1088,6 +1088,8 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          # Use aws-s3-go (v2) instead of goamz
+          UseAWSS3v2Driver: false
 
           # For S3 driver, potentially unsafe tuning parameter,
           # intentionally excluded from main documentation.
index 2790477e4a9c46da6d6c859334b834e5ea888690..bddb5cedb1df8428024f6461ed52ede12d8b9607 100644 (file)
@@ -1094,6 +1094,8 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          # Use aws-s3-go (v2) instead of goamz
+          UseAWSS3v2Driver: false
 
           # For S3 driver, potentially unsafe tuning parameter,
           # intentionally excluded from main documentation.
index a54712f330ea2b1ff2a6b8107daec5639c082c32..9cf1ed3cd182ba8f8659b38dee81bcf0a52ab976 100644 (file)
@@ -277,6 +277,7 @@ type S3VolumeDriverParameters struct {
        Bucket             string
        LocationConstraint bool
        V2Signature        bool
+       UseAWSS3v2Driver   bool
        IndexPageSize      int
        ConnectTimeout     Duration
        ReadTimeout        Duration
@@ -552,7 +553,7 @@ func (ss *StringSet) UnmarshalJSON(data []byte) error {
                return err
        }
        *ss = make(map[string]struct{}, len(hash))
-       for t, _ := range hash {
+       for t := range hash {
                (*ss)[t] = struct{}{}
        }
 
index 96f2e7db3965704570f3906c78ab6e624072e013..235d369b5a67f780fb1cb29794ed65294b7a150c 100644 (file)
@@ -32,12 +32,12 @@ import (
 )
 
 func init() {
-       driver["S3"] = newS3Volume
+       driver["S3"] = chooseS3VolumeDriver
 }
 
 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
        v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
-       err := json.Unmarshal(volume.DriverParameters, &v)
+       err := json.Unmarshal(volume.DriverParameters, v)
        if err != nil {
                return nil, err
        }
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
new file mode 100644 (file)
index 0000000..c9fa7fc
--- /dev/null
@@ -0,0 +1,900 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "context"
+       "encoding/base64"
+       "encoding/hex"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "os"
+       "regexp"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/aws/awserr"
+       "github.com/aws/aws-sdk-go-v2/aws/defaults"
+       "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
+       "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
+       "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+)
+
+// S3Volume implements Volume using an S3 bucket.
+type S3AWSVolume struct {
+       arvados.S3VolumeDriverParameters
+       AuthToken      string    // populated automatically when IAMRole is used
+       AuthExpiration time.Time // populated automatically when IAMRole is used
+
+       cluster   *arvados.Cluster
+       volume    arvados.Volume
+       logger    logrus.FieldLogger
+       metrics   *volumeMetricsVecs
+       bucket    *s3AWSbucket
+       region    string
+       startOnce sync.Once
+}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be replaced atomically with SetBucket in order
+// to update credentials.
+type s3AWSbucket struct {
+       bucket string
+       svc    *s3.Client
+       stats  s3awsbucketStats
+       mu     sync.Mutex
+}
+
+// chooseS3VolumeDriver distinguishes between the old goamz driver and
+// aws-sdk-go based on the UseAWSS3v2Driver feature flag
+func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+       err := json.Unmarshal(volume.DriverParameters, v)
+       if err != nil {
+               return nil, err
+       }
+       if v.UseAWSS3v2Driver {
+               logger.Debugln("Using AWS S3 v2 driver")
+               return newS3AWSVolume(cluster, volume, logger, metrics)
+       } else {
+               logger.Debugln("Using goamz S3 driver")
+               return newS3Volume(cluster, volume, logger, metrics)
+       }
+}
+
+const (
+       PartSize         = 5 * 1024 * 1024
+       ReadConcurrency  = 13
+       WriteConcurrency = 5
+)
+
+var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+var s3AWSZeroTime time.Time
+
+func (v *S3AWSVolume) isKeepBlock(s string) bool {
+       return s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+       v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
+       err := json.Unmarshal(volume.DriverParameters, v)
+       if err != nil {
+               return nil, err
+       }
+       v.logger = logger.WithField("Volume", v.String())
+       return v, v.check("")
+}
+
+func (v *S3AWSVolume) translateError(err error) error {
+       if aerr, ok := err.(awserr.Error); ok {
+               switch aerr.Code() {
+               case "NotFound":
+                       return os.ErrNotExist
+               case "NoSuchKey":
+                       return os.ErrNotExist
+               }
+       }
+       return err
+}
+
+// safeCopy calls CopyObjectRequest, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+//
+// (If something goes wrong during the copy, the error will be embedded in the
+// 200 OK response)
+func (v *S3AWSVolume) safeCopy(dst, src string) error {
+       input := &s3.CopyObjectInput{
+               Bucket:      aws.String(v.bucket.bucket),
+               ContentType: aws.String("application/octet-stream"),
+               CopySource:  aws.String(v.bucket.bucket + "/" + src),
+               Key:         aws.String(dst),
+       }
+
+       req := v.bucket.svc.CopyObjectRequest(input)
+       resp, err := req.Send(context.Background())
+
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
+       }
+
+       if resp.CopyObjectResult.LastModified == nil {
+               return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
+       } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
+               return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+       }
+       return nil
+}
+
+func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+       if v.Bucket == "" {
+               return errors.New("DriverParameters: Bucket must be provided")
+       }
+       if v.IndexPageSize == 0 {
+               v.IndexPageSize = 1000
+       }
+       if v.RaceWindow < 0 {
+               return errors.New("DriverParameters: RaceWindow must not be negative")
+       }
+
+       if v.V2Signature {
+               return errors.New("DriverParameters: V2Signature is not supported")
+       }
+
+       defaultResolver := endpoints.NewDefaultResolver()
+
+       cfg := defaults.Config()
+
+       if v.Endpoint == "" && v.Region == "" {
+               return fmt.Errorf("AWS region or endpoint must be specified")
+       } else if v.Endpoint != "" || ec2metadataHostname != "" {
+               myCustomResolver := func(service, region string) (aws.Endpoint, error) {
+                       if v.Endpoint != "" && service == "s3" {
+                               return aws.Endpoint{
+                                       URL:           v.Endpoint,
+                                       SigningRegion: v.Region,
+                               }, nil
+                       } else if service == "ec2metadata" && ec2metadataHostname != "" {
+                               return aws.Endpoint{
+                                       URL: ec2metadataHostname,
+                               }, nil
+                       }
+
+                       return defaultResolver.ResolveEndpoint(service, region)
+               }
+               cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
+       }
+
+       cfg.Region = v.Region
+
+       // Zero timeouts mean "wait forever", which is a bad
+       // default. Default to long timeouts instead.
+       if v.ConnectTimeout == 0 {
+               v.ConnectTimeout = s3DefaultConnectTimeout
+       }
+       if v.ReadTimeout == 0 {
+               v.ReadTimeout = s3DefaultReadTimeout
+       }
+
+       creds := aws.NewChainProvider(
+               []aws.CredentialsProvider{
+                       aws.NewStaticCredentialsProvider(v.AccessKey, v.SecretKey, v.AuthToken),
+                       ec2rolecreds.New(ec2metadata.New(cfg)),
+               })
+
+       cfg.Credentials = creds
+
+       v.bucket = &s3AWSbucket{
+               bucket: v.Bucket,
+               svc:    s3.New(cfg),
+       }
+
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
+       return nil
+}
+
+// String implements fmt.Stringer.
+func (v *S3AWSVolume) String() string {
+       return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
+}
+
+// GetDeviceID returns a globally unique ID for the storage bucket.
+func (v *S3AWSVolume) GetDeviceID() string {
+       return "s3://" + v.Endpoint + "/" + v.Bucket
+}
+
+// Compare the given data with the stored data.
+func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+       errChan := make(chan error, 1)
+       go func() {
+               _, err := v.Head("recent/" + loc)
+               errChan <- err
+       }()
+       var err error
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       case err = <-errChan:
+       }
+       if err != nil {
+               // Checking for "loc" itself here would interfere with
+               // future GET requests.
+               //
+               // On AWS, if X doesn't exist, a HEAD or GET request
+               // for X causes X's non-existence to be cached. Thus,
+               // if we test for X, then create X and return a
+               // signature to our client, the client might still get
+               // 404 from all keepstores when trying to read it.
+               //
+               // To avoid this, we avoid doing HEAD X or GET X until
+               // we know X has been written.
+               //
+               // Note that X might exist even though recent/X
+               // doesn't: for example, the response to HEAD recent/X
+               // might itself come from a stale cache. In such
+               // cases, we will return a false negative and
+               // PutHandler might needlessly create another replica
+               // on a different volume. That's not ideal, but it's
+               // better than passing the eventually-consistent
+               // problem on to our clients.
+               return v.translateError(err)
+       }
+
+       input := &s3.GetObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       }
+
+       req := v.bucket.svc.GetObjectRequest(input)
+       result, err := req.Send(ctx)
+       if err != nil {
+               return v.translateError(err)
+       }
+       return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
+}
+
+// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
+// and deletes them from the volume.
+func (v *S3AWSVolume) EmptyTrash() {
+       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+               return
+       }
+
+       var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+       // Define "ready to delete" as "...when EmptyTrash started".
+       startT := time.Now()
+
+       emptyOneKey := func(trash *s3.Object) {
+               loc := strings.TrimPrefix(*trash.Key, "trash/")
+               if !v.isKeepBlock(loc) {
+                       return
+               }
+               atomic.AddInt64(&bytesInTrash, *trash.Size)
+               atomic.AddInt64(&blocksInTrash, 1)
+
+               trashT := *(trash.LastModified)
+               recent, err := v.Head("recent/" + loc)
+               if err != nil && os.IsNotExist(v.translateError(err)) {
+                       v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+                       err = v.Untrash(loc)
+                       if err != nil {
+                               v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
+                       }
+                       return
+               } else if err != nil {
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+                       return
+               }
+               if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
+                       if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
+                               // recent/loc is too old to protect
+                               // loc from being Trashed again during
+                               // the raceWindow that starts if we
+                               // delete trash/X now.
+                               //
+                               // Note this means (TrashSweepInterval
+                               // < BlobSigningTTL - raceWindow) is
+                               // necessary to avoid starvation.
+                               v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
+                               v.fixRace(loc)
+                               v.Touch(loc)
+                               return
+                       }
+                       _, err := v.Head(loc)
+                       if os.IsNotExist(err) {
+                               v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
+                               v.fixRace(loc)
+                               return
+                       } else if err != nil {
+                               v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+                               return
+                       }
+               }
+               if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
+                       return
+               }
+               err = v.bucket.Del(*trash.Key)
+               if err != nil {
+                       v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
+                       return
+               }
+               atomic.AddInt64(&bytesDeleted, *trash.Size)
+               atomic.AddInt64(&blocksDeleted, 1)
+
+               _, err = v.Head(loc)
+               if err == nil {
+                       v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+                       return
+               }
+               if !os.IsNotExist(v.translateError(err)) {
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+                       return
+               }
+               err = v.bucket.Del("recent/" + loc)
+               if err != nil {
+                       v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+               }
+       }
+
+       var wg sync.WaitGroup
+       todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+       for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for key := range todo {
+                               emptyOneKey(key)
+                       }
+               }()
+       }
+
+       trashL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   "trash/",
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+               todo <- trash
+       }
+       close(todo)
+       wg.Wait()
+
+       if err := trashL.Error(); err != nil {
+               v.logger.WithError(err).Error("EmptyTrash: lister failed")
+       }
+       v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3AWSVolume) fixRace(loc string) bool {
+       trash, err := v.Head("trash/" + loc)
+       if err != nil {
+               if !os.IsNotExist(v.translateError(err)) {
+                       v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+               }
+               return false
+       }
+
+       recent, err := v.Head("recent/" + loc)
+       if err != nil {
+               v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+               return false
+       }
+
+       recentTime := *recent.LastModified
+       trashTime := *trash.LastModified
+       ageWhenTrashed := trashTime.Sub(recentTime)
+       if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
+               // No evidence of a race: block hasn't been written
+               // since it became eligible for Trash. No fix needed.
+               return false
+       }
+
+       v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+       v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+       err = v.safeCopy(loc, "trash/"+loc)
+       if err != nil {
+               v.logger.WithError(err).Error("fixRace: copy failed")
+               return false
+       }
+       return true
+}
+
+func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+       input := &s3.HeadObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       }
+
+       req := v.bucket.svc.HeadObjectRequest(input)
+       res, err := req.Send(context.TODO())
+
+       v.bucket.stats.TickOps("head")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
+       v.bucket.stats.TickErr(err)
+
+       if err != nil {
+               return nil, v.translateError(err)
+       }
+       result = res.HeadObjectOutput
+       return
+}
+
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
+func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       return getWithPipe(ctx, loc, buf, v)
+}
+
+func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+       buf := make([]byte, 0, 67108864)
+       awsBuf := aws.NewWriteAtBuffer(buf)
+
+       downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+               u.PartSize = PartSize
+               u.Concurrency = ReadConcurrency
+       })
+
+       v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
+
+       _, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       })
+       v.bucket.stats.TickOps("get")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
+       v.bucket.stats.TickErr(err)
+       if err != nil {
+               return nil, v.translateError(err)
+       }
+       buf = awsBuf.Bytes()
+
+       rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes)
+       return
+}
+
+// ReadBlock implements BlockReader.
+func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
+       rdr, err := v.readWorker(ctx, loc)
+
+       if err == nil {
+               _, err2 := io.Copy(w, rdr)
+               if err2 != nil {
+                       return err2
+               }
+               return err
+       }
+
+       err = v.translateError(err)
+       if !os.IsNotExist(err) {
+               return err
+       }
+
+       _, err = v.Head("recent/" + loc)
+       err = v.translateError(err)
+       if err != nil {
+               // If we can't read recent/X, there's no point in
+               // trying fixRace. Give up.
+               return err
+       }
+       if !v.fixRace(loc) {
+               err = os.ErrNotExist
+               return err
+       }
+
+       rdr, err = v.readWorker(ctx, loc)
+       if err != nil {
+               v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+               err = v.translateError(err)
+               return err
+       }
+
+       _, err = io.Copy(w, rdr)
+
+       return err
+}
+
+func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader) error {
+       if r == nil {
+               // r == nil leads to a memory violation in func readFillBuf in
+               // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
+               r = bytes.NewReader(nil)
+       }
+
+       uploadInput := s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(name),
+               Body:   r,
+       }
+
+       if len(name) == 32 {
+               var contentMD5 string
+               md5, err := hex.DecodeString(name)
+               if err != nil {
+                       return err
+               }
+               contentMD5 = base64.StdEncoding.EncodeToString(md5)
+               uploadInput.ContentMD5 = &contentMD5
+       }
+
+       // Experimentation indicated that using concurrency 5 yields the best
+       // throughput, better than higher concurrency (10 or 13) by ~5%.
+       // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
+       // is detrimental to througput (minus ~15%).
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+               u.PartSize = PartSize
+               u.Concurrency = WriteConcurrency
+       })
+
+       // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
+       // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
+       // block, so there is no extra memory use to be concerned about. See
+       // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
+       // calculating the Sha-256 because we don't need it; we already use md5sum
+       // hashes that match the name of the block.
+       _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+               r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
+       }))
+
+       v.bucket.stats.TickOps("put")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+       v.bucket.stats.TickErr(err)
+
+       return err
+}
+
+// Put writes a block.
+func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
+       return putWithPipe(ctx, loc, block, v)
+}
+
+// WriteBlock implements BlockWriter.
+func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+
+       r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+       err := v.writeObject(ctx, loc, r)
+       if err != nil {
+               return err
+       }
+       return v.writeObject(ctx, "recent/"+loc, nil)
+}
+
+type s3awsLister struct {
+       Logger            logrus.FieldLogger
+       Bucket            *s3AWSbucket
+       Prefix            string
+       PageSize          int
+       Stats             *s3awsbucketStats
+       ContinuationToken string
+       buf               []s3.Object
+       err               error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3awsLister) First() *s3.Object {
+       lister.getPage()
+       return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3awsLister) Next() *s3.Object {
+       if len(lister.buf) == 0 && lister.ContinuationToken != "" {
+               lister.getPage()
+       }
+       return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3awsLister) Error() error {
+       return lister.err
+}
+
+func (lister *s3awsLister) getPage() {
+       lister.Stats.TickOps("list")
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
+
+       var input *s3.ListObjectsV2Input
+       if lister.ContinuationToken == "" {
+               input = &s3.ListObjectsV2Input{
+                       Bucket:  aws.String(lister.Bucket.bucket),
+                       MaxKeys: aws.Int64(int64(lister.PageSize)),
+                       Prefix:  aws.String(lister.Prefix),
+               }
+       } else {
+               input = &s3.ListObjectsV2Input{
+                       Bucket:            aws.String(lister.Bucket.bucket),
+                       MaxKeys:           aws.Int64(int64(lister.PageSize)),
+                       Prefix:            aws.String(lister.Prefix),
+                       ContinuationToken: &lister.ContinuationToken,
+               }
+       }
+
+       req := lister.Bucket.svc.ListObjectsV2Request(input)
+       resp, err := req.Send(context.Background())
+       if err != nil {
+               if aerr, ok := err.(awserr.Error); ok {
+                       lister.err = aerr
+               } else {
+                       lister.err = err
+               }
+               return
+       }
+
+       if *resp.IsTruncated {
+               lister.ContinuationToken = *resp.NextContinuationToken
+       } else {
+               lister.ContinuationToken = ""
+       }
+       lister.buf = make([]s3.Object, 0, len(resp.Contents))
+       for _, key := range resp.Contents {
+               if !strings.HasPrefix(*key.Key, lister.Prefix) {
+                       lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
+                       continue
+               }
+               lister.buf = append(lister.buf, key)
+       }
+}
+
+func (lister *s3awsLister) pop() (k *s3.Object) {
+       if len(lister.buf) > 0 {
+               k = &lister.buf[0]
+               lister.buf = lister.buf[1:]
+       }
+       return
+}
+
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
+func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+       // Use a merge sort to find matching sets of X and recent/X.
+       dataL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   prefix,
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       recentL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   "recent/" + prefix,
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
+               if *data.Key >= "g" {
+                       // Conveniently, "recent/*" and "trash/*" are
+                       // lexically greater than all hex-encoded data
+                       // hashes, so stopping here avoids iterating
+                       // over all of them needlessly with dataL.
+                       break
+               }
+               if !v.isKeepBlock(*data.Key) {
+                       continue
+               }
+
+               // stamp is the list entry we should use to report the
+               // last-modified time for this data block: it will be
+               // the recent/X entry if one exists, otherwise the
+               // entry for the data block itself.
+               stamp := data
+
+               // Advance to the corresponding recent/X marker, if any
+               for recent != nil && recentL.Error() == nil {
+                       if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
+                               recent = recentL.Next()
+                               continue
+                       } else if cmp == 0 {
+                               stamp = recent
+                               recent = recentL.Next()
+                               break
+                       } else {
+                               // recent/X marker is missing: we'll
+                               // use the timestamp on the data
+                               // object.
+                               break
+                       }
+               }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
+               fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.UnixNano())
+       }
+       return dataL.Error()
+}
+
+// Mtime returns the stored timestamp for the given locator.
+func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+       _, err := v.Head(loc)
+       if err != nil {
+               return s3AWSZeroTime, v.translateError(err)
+       }
+       resp, err := v.Head("recent/" + loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               // The data object X exists, but recent/X is missing.
+               err = v.writeObject(context.Background(), "recent/"+loc, nil)
+               if err != nil {
+                       v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+                       return s3AWSZeroTime, v.translateError(err)
+               }
+               v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+               resp, err = v.Head("recent/" + loc)
+               if err != nil {
+                       v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+                       return s3AWSZeroTime, v.translateError(err)
+               }
+       } else if err != nil {
+               // HEAD recent/X failed for some other reason.
+               return s3AWSZeroTime, err
+       }
+       return *resp.LastModified, err
+}
+
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
+func (v *S3AWSVolume) Status() *VolumeStatus {
+       return &VolumeStatus{
+               DeviceNum: 1,
+               BytesFree: BlockSize * 1000,
+               BytesUsed: 1,
+       }
+}
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3AWSVolume) InternalStats() interface{} {
+       return &v.bucket.stats
+}
+
+// Touch sets the timestamp for the given locator to the current time.
+func (v *S3AWSVolume) Touch(loc string) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+       _, err := v.Head(loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) && v.fixRace(loc) {
+               // The data object got trashed in a race, but fixRace
+               // rescued it.
+       } else if err != nil {
+               return err
+       }
+       err = v.writeObject(context.Background(), "recent/"+loc, nil)
+       return v.translateError(err)
+}
+
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3AWSVolume) checkRaceWindow(loc string) error {
+       resp, err := v.Head("trash/" + loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               // OK, trash/X doesn't exist so we're not in the race
+               // window
+               return nil
+       } else if err != nil {
+               // Error looking up trash/X. We don't know whether
+               // we're in the race window
+               return err
+       }
+       t := resp.LastModified
+       safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
+       if safeWindow <= 0 {
+               // We can't count on "touch trash/X" to prolong
+               // trash/X's lifetime. The new timestamp might not
+               // become visible until now+raceWindow, and EmptyTrash
+               // is allowed to delete trash/X before then.
+               return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+       }
+       // trash/X exists, but it won't be eligible for deletion until
+       // after now+raceWindow, so it's safe to overwrite it.
+       return nil
+}
+
+func (b *s3AWSbucket) Del(path string) error {
+       input := &s3.DeleteObjectInput{
+               Bucket: aws.String(b.bucket),
+               Key:    aws.String(path),
+       }
+       req := b.svc.DeleteObjectRequest(input)
+       _, err := req.Send(context.Background())
+       //err := b.Bucket().Del(path)
+       b.stats.TickOps("delete")
+       b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
+       b.stats.TickErr(err)
+       return err
+}
+
+// Trash a Keep block.
+func (v *S3AWSVolume) Trash(loc string) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+       if t, err := v.Mtime(loc); err != nil {
+               return err
+       } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
+               return nil
+       }
+       if v.cluster.Collections.BlobTrashLifetime == 0 {
+               if !v.UnsafeDelete {
+                       return ErrS3TrashDisabled
+               }
+               return v.translateError(v.bucket.Del(loc))
+       }
+       err := v.checkRaceWindow(loc)
+       if err != nil {
+               return err
+       }
+       err = v.safeCopy("trash/"+loc, loc)
+       if err != nil {
+               return err
+       }
+       return v.translateError(v.bucket.Del(loc))
+}
+
+// Untrash moves block from trash back into store
+func (v *S3AWSVolume) Untrash(loc string) error {
+       err := v.safeCopy(loc, "trash/"+loc)
+       if err != nil {
+               return err
+       }
+       err = v.writeObject(context.Background(), "recent/"+loc, nil)
+       return v.translateError(err)
+}
+
+type s3awsbucketStats struct {
+       statsTicker
+       Ops     uint64
+       GetOps  uint64
+       PutOps  uint64
+       HeadOps uint64
+       DelOps  uint64
+       ListOps uint64
+}
+
+func (s *s3awsbucketStats) TickErr(err error) {
+       if err == nil {
+               return
+       }
+       errType := fmt.Sprintf("%T", err)
+       if aerr, ok := err.(awserr.Error); ok {
+               if reqErr, ok := err.(awserr.RequestFailure); ok {
+                       // A service error occurred
+                       errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+               } else {
+                       errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+               }
+       }
+       s.statsTicker.TickErr(err, errType)
+}
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
new file mode 100644 (file)
index 0000000..97045a6
--- /dev/null
@@ -0,0 +1,657 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "context"
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "strings"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+
+       "github.com/johannesboyne/gofakes3"
+       "github.com/johannesboyne/gofakes3/backend/s3mem"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+const (
+       S3AWSTestBucketName = "testbucket"
+)
+
+type s3AWSFakeClock struct {
+       now *time.Time
+}
+
+func (c *s3AWSFakeClock) Now() time.Time {
+       if c.now == nil {
+               return time.Now().UTC()
+       }
+       return c.now.UTC()
+}
+
+func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+       return c.Now().Sub(t)
+}
+
+var _ = check.Suite(&StubbedS3AWSSuite{})
+
+var srv httptest.Server
+
+type StubbedS3AWSSuite struct {
+       s3server *httptest.Server
+       metadata *httptest.Server
+       cluster  *arvados.Cluster
+       handler  *handler
+       volumes  []*TestableS3AWSVolume
+}
+
+func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+       s.s3server = nil
+       s.metadata = nil
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
+               "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
+       }
+       s.handler = &handler{}
+}
+
+func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               // Use a negative raceWindow so s3test's 1-second
+               // timestamp precision doesn't confuse fixRace.
+               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+       DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+       v.IndexPageSize = 3
+       for i := 0; i < 256; i++ {
+               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+       }
+       for _, spec := range []struct {
+               prefix      string
+               expectMatch int
+       }{
+               {"", 256},
+               {"c", 16},
+               {"bc", 1},
+               {"abc", 0},
+       } {
+               buf := new(bytes.Buffer)
+               err := v.IndexTo(spec.prefix, buf)
+               c.Check(err, check.IsNil)
+
+               idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
+               c.Check(len(idx), check.Equals, spec.expectMatch+1)
+               c.Check(len(idx[len(idx)-1]), check.Equals, 0)
+       }
+}
+
+func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+       // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+       vol := S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKey: "xxx",
+                       SecretKey: "xxx",
+                       Endpoint:  stub.URL,
+                       Region:    "test-region-1",
+                       Bucket:    "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := vol.check("")
+       // Our test S3 server uses the older 'Path Style'
+       vol.bucket.svc.ForcePathStyle = true
+
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+               exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+               // Literal example from
+               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+               // but with updated timestamps
+               io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+       }))
+       defer s.metadata.Close()
+
+       v := &S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/latest/api/token",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := v.check(s.metadata.URL + "/latest")
+       creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusNotFound)
+       }))
+       deadv := &S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err = deadv.check(s.metadata.URL + "/latest")
+       _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+       c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+}
+
+func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       stats := func() string {
+               buf, err := json.Marshal(v.InternalStats())
+               c.Check(err, check.IsNil)
+               return string(buf)
+       }
+
+       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.NotNil)
+       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+       err = v.Put(context.Background(), loc, []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+       c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
+type s3AWSBlockingHandler struct {
+       requested chan *http.Request
+       unblock   chan struct{}
+}
+
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
+               // Accept PutBucket ("PUT /bucketname/"), called by
+               // newTestableVolume
+               return
+       }
+       if h.requested != nil {
+               h.requested <- r
+       }
+       if h.unblock != nil {
+               <-h.unblock
+       }
+       http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := make([]byte, 3)
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               _, err := v.Get(ctx, loc, buf)
+               return err
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("bar")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               return v.Compare(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("foo")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               return v.Put(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+       handler := &s3AWSBlockingHandler{}
+       s.s3server = httptest.NewServer(handler)
+       defer s.s3server.Close()
+
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+
+       ctx, cancel := context.WithCancel(context.Background())
+
+       handler.requested = make(chan *http.Request)
+       handler.unblock = make(chan struct{})
+       defer close(handler.unblock)
+
+       doneFunc := make(chan struct{})
+       go func() {
+               err := testFunc(ctx, v)
+               c.Check(err, check.Equals, context.Canceled)
+               close(doneFunc)
+       }()
+
+       timeout := time.After(10 * time.Second)
+
+       // Wait for the stub server to receive a request, meaning
+       // Get() is waiting for an s3 operation.
+       select {
+       case <-timeout:
+               c.Fatal("timed out waiting for test func to call our handler")
+       case <-doneFunc:
+               c.Fatal("test func finished without even calling our handler!")
+       case <-handler.requested:
+       }
+
+       cancel()
+
+       select {
+       case <-timeout:
+               c.Fatal("timed out")
+       case <-doneFunc:
+       }
+}
+
+func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+       s.cluster.Collections.BlobTrashLifetime.Set("1h")
+       s.cluster.Collections.BlobSigningTTL.Set("1h")
+
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       var none time.Time
+
+       putS3Obj := func(t time.Time, key string, data []byte) {
+               if t == none {
+                       return
+               }
+               v.serverClock.now = &t
+               uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+               _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+                       Bucket: aws.String(v.bucket.bucket),
+                       Key:    aws.String(key),
+                       Body:   bytes.NewReader(data),
+               })
+               if err != nil {
+                       panic(err)
+               }
+               v.serverClock.now = nil
+               _, err = v.Head(key)
+               if err != nil {
+                       panic(err)
+               }
+       }
+
+       t0 := time.Now()
+       nextKey := 0
+       for _, scenario := range []struct {
+               label               string
+               dataT               time.Time
+               recentT             time.Time
+               trashT              time.Time
+               canGet              bool
+               canTrash            bool
+               canGetAfterTrash    bool
+               canUntrash          bool
+               haveTrashAfterEmpty bool
+               freshAfterEmpty     bool
+       }{
+               {
+                       "No related objects",
+                       none, none, none,
+                       false, false, false, false, false, false,
+               },
+               {
+                       // Stored by older version, or there was a
+                       // race between EmptyTrash and Put: Trash is a
+                       // no-op even though the data object is very
+                       // old
+                       "No recent/X",
+                       t0.Add(-48 * time.Hour), none, none,
+                       true, true, true, false, false, false,
+               },
+               {
+                       "Not trash, but old enough to be eligible for trash",
+                       t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+                       true, true, false, false, false, false,
+               },
+               {
+                       "Not trash, and not old enough to be eligible for trash",
+                       t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+                       true, true, true, false, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, due to recent race between Trash and Put",
+                       t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+                       true, true, true, true, true, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+                       true, false, true, true, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+                       true, false, true, true, true, true,
+               },
+               {
+                       "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
+                       t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+                       true, true, true, true, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
+                       t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+                       true, false, true, true, false, false,
+               },
+               {
+                       "Trash, not yet eligible for deletion",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+                       false, false, false, true, true, false,
+               },
+               {
+                       "Trash, not yet eligible for deletion, prone to races",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+                       false, false, false, true, true, false,
+               },
+               {
+                       "Trash, eligible for deletion",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+                       false, false, false, true, false, false,
+               },
+               {
+                       "Erroneously trashed during a race, detected before BlobTrashLifetime",
+                       none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
+                       none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+                       none, none, t0.Add(-time.Minute),
+                       false, false, false, true, true, true,
+               },
+       } {
+               c.Log("Scenario: ", scenario.label)
+
+               // We have a few tests to run for each scenario, and
+               // the tests are expected to change state. By calling
+               // this setup func between tests, we (re)create the
+               // scenario as specified, using a new unique block
+               // locator to prevent interference from previous
+               // tests.
+
+               setupScenario := func() (string, []byte) {
+                       nextKey++
+                       blk := []byte(fmt.Sprintf("%d", nextKey))
+                       loc := fmt.Sprintf("%x", md5.Sum(blk))
+                       c.Log("\t", loc)
+                       putS3Obj(scenario.dataT, loc, blk)
+                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
+                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
+                       v.serverClock.now = &t0
+                       return loc, blk
+               }
+
+               // Check canGet
+               loc, blk := setupScenario()
+               buf := make([]byte, len(blk))
+               _, err := v.Get(context.Background(), loc, buf)
+               c.Check(err == nil, check.Equals, scenario.canGet)
+               if err != nil {
+                       c.Check(os.IsNotExist(err), check.Equals, true)
+               }
+
+               // Call Trash, then check canTrash and canGetAfterTrash
+               loc, _ = setupScenario()
+               err = v.Trash(loc)
+               c.Check(err == nil, check.Equals, scenario.canTrash)
+               _, err = v.Get(context.Background(), loc, buf)
+               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+               if err != nil {
+                       c.Check(os.IsNotExist(err), check.Equals, true)
+               }
+
+               // Call Untrash, then check canUntrash
+               loc, _ = setupScenario()
+               err = v.Untrash(loc)
+               c.Check(err == nil, check.Equals, scenario.canUntrash)
+               if scenario.dataT != none || scenario.trashT != none {
+                       // In all scenarios where the data exists, we
+                       // should be able to Get after Untrash --
+                       // regardless of timestamps, errors, race
+                       // conditions, etc.
+                       _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err, check.IsNil)
+               }
+
+               // Call EmptyTrash, then check haveTrashAfterEmpty and
+               // freshAfterEmpty
+               loc, _ = setupScenario()
+               v.EmptyTrash()
+               _, err = v.Head("trash/" + loc)
+               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+               if scenario.freshAfterEmpty {
+                       t, err := v.Mtime(loc)
+                       c.Check(err, check.IsNil)
+                       // new mtime must be current (with an
+                       // allowance for 1s timestamp precision)
+                       c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+               }
+
+               // Check for current Mtime after Put (applies to all
+               // scenarios)
+               loc, blk = setupScenario()
+               err = v.Put(context.Background(), loc, blk)
+               c.Check(err, check.IsNil)
+               t, err := v.Mtime(loc)
+               c.Check(err, check.IsNil)
+               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+       }
+}
+
+type TestableS3AWSVolume struct {
+       *S3AWSVolume
+       server      *httptest.Server
+       c           *check.C
+       serverClock *s3AWSFakeClock
+}
+
+type LogrusLog struct {
+       log *logrus.FieldLogger
+}
+
+func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+       switch level {
+       case gofakes3.LogErr:
+               (*l.log).Errorln(v...)
+       case gofakes3.LogWarn:
+               (*l.log).Warnln(v...)
+       case gofakes3.LogInfo:
+               (*l.log).Infoln(v...)
+       default:
+               panic("unknown level")
+       }
+}
+
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+
+       clock := &s3AWSFakeClock{}
+       // fake s3
+       backend := s3mem.New(s3mem.WithTimeSource(clock))
+
+       // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
+       /* logger := new(LogrusLog)
+       ctxLogger := ctxlog.FromContext(context.Background())
+       logger.log = &ctxLogger */
+       faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
+       srv := httptest.NewServer(faker.Server())
+
+       endpoint := srv.URL
+       if s.s3server != nil {
+               endpoint = s.s3server.URL
+       }
+
+       iamRole, accessKey, secretKey := "", "xxx", "xxx"
+       if s.metadata != nil {
+               iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+       }
+
+       v := &TestableS3AWSVolume{
+               S3AWSVolume: &S3AWSVolume{
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               IAMRole:            iamRole,
+                               AccessKey:          accessKey,
+                               SecretKey:          secretKey,
+                               Bucket:             S3AWSTestBucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster: cluster,
+                       volume:  volume,
+                       logger:  ctxlog.TestLogger(c),
+                       metrics: metrics,
+               },
+               c:           c,
+               server:      srv,
+               serverClock: clock,
+       }
+       c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+       // Our test S3 server uses the older 'Path Style'
+       v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+       // Create the testbucket
+       input := &s3.CreateBucketInput{
+               Bucket: aws.String(S3AWSTestBucketName),
+       }
+       req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+       _, err := req.Send(context.Background())
+       c.Assert(err, check.IsNil)
+       // We couldn't set RaceWindow until now because check()
+       // rejects negative values.
+       v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+       return v
+}
+
+// PutRaw skips the ContentMD5 test
+func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+
+       r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 13
+       })
+
+       _, err := uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+               Body:   r,
+       })
+       if err != nil {
+               v.logger.Printf("PutRaw: %s: %+v", loc, err)
+       }
+
+       empty := bytes.NewReader([]byte{})
+       _, err = uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + loc),
+               Body:   empty,
+       })
+       if err != nil {
+               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+       }
+}
+
+// TouchWithDate turns back the clock while doing a Touch(). We assume
+// there are no other operations happening on the same s3test server
+// while we do this.
+func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+       v.serverClock.now = &lastPut
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+       empty := bytes.NewReader([]byte{})
+       _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + locator),
+               Body:   empty,
+       })
+       if err != nil {
+               panic(err)
+       }
+
+       v.serverClock.now = nil
+}
+
+func (v *TestableS3AWSVolume) Teardown() {
+       v.server.Close()
+}
+
+func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "put"
+}