10477: first version of the aws-sdk-go-v2 driver.
authorWard Vandewege <ward@curii.com>
Wed, 22 Jul 2020 11:10:14 +0000 (07:10 -0400)
committerWard Vandewege <ward@curii.com>
Mon, 3 Aug 2020 16:18:39 +0000 (12:18 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

doc/install/configure-s3-object-storage.html.textile.liquid
go.mod
go.sum
lib/config/config.default.yml
lib/config/generated_config.go
sdk/go/arvados/config.go
services/keepstore/s3_volume.go
services/keepstore/s3aws_volume.go [new file with mode: 0644]
services/keepstore/s3aws_volume_test.go [new file with mode: 0644]

index b960ac1fda0c2ab1fbaae77e4ae3c875b8dec0bc..cfd436dd6f4b660f383851ae1249ebb50d2886a1 100644 (file)
@@ -64,6 +64,9 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
           # might be needed for other S3-compatible services.
           V2Signature: false
 
+          # Use the AWS S3 Go driver instead of the goamz driver.
+          AlternateDriver: false
+
           # Requested page size for "list bucket contents" requests.
           IndexPageSize: 1000
 
@@ -94,3 +97,21 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
         # classes" in the "Admin" section of doc.arvados.org.
         StorageClasses: null
 </code></pre></notextile>
+
+Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @AlternateDriver@ flag to @true@.
+
+The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
+
+The @aws-sdk-go-v2@ driver has faster _single thread_ read and write performance than the @goamz@ driver. Here are some benchmark numbers against AWS S3, as measured in July 2020. They were generated with the @keep-exercise@ tool in an Arvados installation with one dedicated Keepstore node (c5n.2xlarge) and one dedicated node for running @keep-exercise@ (c5n.2xlarge). The Keepstore node was backed by one S3 bucket, in a VPC with an S3 endpoint installed. Versioning, Server Access logging, Static website hosting, Object-level logging and Default encryption were disabled. Object lock, Transfer acceleration and Requester pays were also disabled. There were no Active notifications. Each test consisted of 4 one minute runs, which were averaged into one number. The tests were repeated 3 times, and of those 3 runs, the highest average speed was selected and included in the table below.
+
+table(table table-bordered table-condensed).
+||_. goamz |_. aws-sdk-go-v2 |_. command line|
+|single thread read performance (average)|32.53 MiB/s|79.48 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
+|single thread write performance (average)|39.75 MiB/s|49.58 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
+
+Because both S3 and Keep are optimized for _aggregate_ througput, the single thread performance is not as important as it may seem at first glance. When using 20 concurrent read or write threads, the numbers from both drivers are more closely aligned:
+
+table(table table-bordered table-condensed).
+||_. goamz |_. aws-sdk-go-v2 |_. command line|
+|20 thread read performance (average)|585.60 MiB/s|898.93 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 20 -wthreads 0|
+|20 thread write performance (average)|610.40 MiB/s|542.40 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 0 -wthreads 20|
diff --git a/go.mod b/go.mod
index 884d1fcdac8637e77fb349aa569ad8fcbb5b7924..71052882adbeff703ae81a21900561afe15c8743 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
        github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
        github.com/arvados/cgofuse v1.2.0-arvados1
        github.com/aws/aws-sdk-go v1.25.30
+       github.com/aws/aws-sdk-go-v2 v0.23.0
        github.com/bgentry/speakeasy v0.1.0 // indirect
        github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
        github.com/coreos/go-oidc v2.1.0+incompatible
@@ -37,6 +38,7 @@ require (
        github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
        github.com/jmcvetta/randutil v0.0.0-20150817122601-2bb1b664bcff
        github.com/jmoiron/sqlx v1.2.0
+       github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc
        github.com/julienschmidt/httprouter v1.2.0
        github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7 // indirect
        github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
@@ -58,7 +60,7 @@ require (
        github.com/stretchr/testify v1.4.0 // indirect
        github.com/xanzy/ssh-agent v0.1.0 // indirect
        golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
-       golang.org/x/net v0.0.0-20190620200207-3b0461eec859
+       golang.org/x/net v0.0.0-20200202094626-16171245cfb2
        golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
        golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
        google.golang.org/api v0.13.0
diff --git a/go.sum b/go.sum
index ead655c9b276164c2a7e9766ea57d7b96ef3f82a..2565964e7d45121d76e59e0c7b5e21743beaaa28 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -21,14 +21,18 @@ github.com/arvados/cgofuse v1.2.0-arvados1 h1:4Q4vRJ4hbTCcI4gGEaa6hqwj3rqlUuzeFQ
 github.com/arvados/cgofuse v1.2.0-arvados1/go.mod h1:79WFV98hrkRHK9XPhh2IGGOwpFSjocsWubgxAs2KhRc=
 github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef h1:cl7DIRbiAYNqaVxg3CZY8qfZoBOKrj06H/x9SPGaxas=
 github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef/go.mod h1:rCtgyMmBGEbjTm37fCuBYbNL0IhztiALzo3OB9HyiOM=
+github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
 github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
+github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
 github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092 h1:0Di2onNnlN5PAyWPbqlPyN45eOQ+QW/J9eqLynt4IV4=
 github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092/go.mod h1:8IzBjZCRSnsvM6MJMG8HNNtnzMl48H22rbJL2kRUJ0Y=
 github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
@@ -74,7 +78,7 @@ github.com/go-ldap/ldap v3.0.3+incompatible h1:HTeSZO8hWMS1Rgb2Ziku6b8a7qRIZZMHj
 github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
-github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
+github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -91,6 +95,7 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -114,6 +119,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
 github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc/go.mod h1:fNiSoOiEI5KlkWXn26OwKnNe58ilTIkpBlgOrt7Olu8=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
@@ -171,13 +178,17 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
 github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
 github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5 h1:Jw7W4WMfQDxsXvfeFSaS2cHlY7bAF4MGrgnbd0+Uo78=
 github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
 github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
+github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63/go.mod h1:n+VKSARF5y/tS9XFSP7vWDfS+GUC5vs/YT7M5XDTUEM=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
 github.com/src-d/gcfg v1.3.0 h1:2BEDr8r0I0b8h/fOqwtxCEiq2HJu8n2JGZJQFGXWLjg=
 github.com/src-d/gcfg v1.3.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -206,6 +217,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190310074541-c10a0554eabf/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
@@ -214,6 +226,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowK
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
@@ -227,6 +240,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190310054646-10058d7d4faa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -242,10 +256,12 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190308174544-00c44ba9c14f/go.mod h1:25r3+/G6/xytQM8iWZKq3Hn0kr0rgFKPUNVEL/dr3z4=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c h1:97SnQk1GYRXJgvwZ8fadnxDOWfKvkNQHH3CtZntPSrM=
 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
 google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
@@ -267,6 +283,7 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
 gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
 gopkg.in/src-d/go-billy.v4 v4.0.1 h1:iMxwQPj2cuKRyaIZ985zxClkcdTtT5VpXYf4PTJc0Ek=
index 907acdc87847f9c052aee71c5e1d1fbe8c4f78aa..d2ccefe8b0b106a08d4a00c7f861fd1e159c17e0 100644 (file)
@@ -1072,6 +1072,8 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          # Use aws-s3-go (v2) instead of goamz
+          AlternateDriver: false
 
           # For S3 driver, potentially unsafe tuning parameter,
           # intentionally excluded from main documentation.
index 96da19dfcdc14c6e20f0d1ea348c2423f909b1ba..d6d1984292f9aa9f1c75bee6eedb81a6bf8e40d1 100644 (file)
@@ -1078,6 +1078,8 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          # Use aws-s3-go (v2) instead of goamz
+          AlternateDriver: false
 
           # For S3 driver, potentially unsafe tuning parameter,
           # intentionally excluded from main documentation.
index a54712f330ea2b1ff2a6b8107daec5639c082c32..515dc7973e2acf5e8b755564e2b0d17a0609d357 100644 (file)
@@ -277,6 +277,7 @@ type S3VolumeDriverParameters struct {
        Bucket             string
        LocationConstraint bool
        V2Signature        bool
+       AlternateDriver    bool
        IndexPageSize      int
        ConnectTimeout     Duration
        ReadTimeout        Duration
index 96f2e7db3965704570f3906c78ab6e624072e013..8e32e592bd7e1fc8f0e5a513184e1df179f248c2 100644 (file)
@@ -32,7 +32,7 @@ import (
 )
 
 func init() {
-       driver["S3"] = newS3Volume
+       driver["S3"] = chooseS3VolumeDriver
 }
 
 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
new file mode 100644 (file)
index 0000000..d4fe9fd
--- /dev/null
@@ -0,0 +1,921 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "context"
+       "encoding/base64"
+       "encoding/hex"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "os"
+       "regexp"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/aws/awserr"
+       "github.com/aws/aws-sdk-go-v2/aws/defaults"
+       "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
+       "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
+       "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+)
+
+// S3Volume implements Volume using an S3 bucket.
+type S3AWSVolume struct {
+       arvados.S3VolumeDriverParameters
+       AuthToken      string    // populated automatically when IAMRole is used
+       AuthExpiration time.Time // populated automatically when IAMRole is used
+
+       cluster   *arvados.Cluster
+       volume    arvados.Volume
+       logger    logrus.FieldLogger
+       metrics   *volumeMetricsVecs
+       bucket    *s3AWSbucket
+       region    string
+       startOnce sync.Once
+}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be replaced atomically with SetBucket in order
+// to update credentials.
+type s3AWSbucket struct {
+       bucket string
+       svc    *s3.Client
+       stats  s3awsbucketStats
+       mu     sync.Mutex
+}
+
+// chooseS3VolumeDriver distinguishes between the old goamz driver and
+// aws-sdk-go based on the AlternateDriver feature flag
+func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+       err := json.Unmarshal(volume.DriverParameters, &v)
+       if err != nil {
+               return nil, err
+       }
+       if v.AlternateDriver {
+               logger.Debugln("Using alternate S3 driver (aws-go)")
+               return newS3AWSVolume(cluster, volume, logger, metrics)
+       } else {
+               logger.Debugln("Using standard S3 driver (goamz)")
+               return newS3Volume(cluster, volume, logger, metrics)
+       }
+}
+
+var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+var s3AWSZeroTime time.Time
+
+func (v *S3AWSVolume) isKeepBlock(s string) bool {
+       return s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+       logger.Debugln("in newS3AWSVolume")
+       v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
+       err := json.Unmarshal(volume.DriverParameters, &v)
+       if err != nil {
+               return nil, err
+       }
+       v.logger = logger.WithField("Volume", v.String())
+       v.logger.Debugln("in newS3AWSVolume after volume set")
+       return v, v.check("")
+}
+
+func (v *S3AWSVolume) translateError(err error) error {
+       if aerr, ok := err.(awserr.Error); ok {
+               switch aerr.Code() {
+               case "NotFound":
+                       return os.ErrNotExist
+               case "NoSuchKey":
+                       return os.ErrNotExist
+               }
+       }
+       return err
+}
+
+// safeCopy calls CopyObjectRequest, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+//
+// (If something goes wrong during the copy, the error will be embedded in the
+// 200 OK response)
+func (v *S3AWSVolume) safeCopy(dst, src string) error {
+       input := &s3.CopyObjectInput{
+               Bucket:      aws.String(v.bucket.bucket),
+               ContentType: aws.String("application/octet-stream"),
+               CopySource:  aws.String(v.bucket.bucket + "/" + src),
+               Key:         aws.String(dst),
+       }
+
+       req := v.bucket.svc.CopyObjectRequest(input)
+       resp, err := req.Send(context.Background())
+
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
+       }
+
+       if resp.CopyObjectResult.LastModified == nil {
+               return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
+       } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
+               return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+       }
+       return nil
+}
+
+func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+       if v.Bucket == "" {
+               return errors.New("DriverParameters: Bucket must be provided")
+       }
+       if v.IndexPageSize == 0 {
+               v.IndexPageSize = 1000
+       }
+       if v.RaceWindow < 0 {
+               return errors.New("DriverParameters: RaceWindow must not be negative")
+       }
+
+       defaultResolver := endpoints.NewDefaultResolver()
+
+       cfg := defaults.Config()
+
+       if v.Endpoint == "" && v.Region == "" {
+               return fmt.Errorf("AWS region or endpoint must be specified")
+       } else if v.Endpoint != "" || ec2metadataHostname != "" {
+               myCustomResolver := func(service, region string) (aws.Endpoint, error) {
+                       if v.Endpoint != "" && service == "s3" {
+                               return aws.Endpoint{
+                                       URL:           v.Endpoint,
+                                       SigningRegion: v.Region,
+                               }, nil
+                       } else if service == "ec2metadata" && ec2metadataHostname != "" {
+                               return aws.Endpoint{
+                                       URL: ec2metadataHostname,
+                               }, nil
+                       }
+
+                       return defaultResolver.ResolveEndpoint(service, region)
+               }
+               cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
+       }
+
+       cfg.Region = v.Region
+
+       // Zero timeouts mean "wait forever", which is a bad
+       // default. Default to long timeouts instead.
+       if v.ConnectTimeout == 0 {
+               v.ConnectTimeout = s3DefaultConnectTimeout
+       }
+       if v.ReadTimeout == 0 {
+               v.ReadTimeout = s3DefaultReadTimeout
+       }
+
+       creds := aws.NewChainProvider(
+               []aws.CredentialsProvider{
+                       aws.NewStaticCredentialsProvider(v.AccessKey, v.SecretKey, v.AuthToken),
+                       ec2rolecreds.New(ec2metadata.New(cfg)),
+               })
+
+       cfg.Credentials = creds
+
+       v.bucket = &s3AWSbucket{
+               bucket: v.Bucket,
+               svc:    s3.New(cfg),
+       }
+
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
+       return nil
+}
+
+// String implements fmt.Stringer.
+func (v *S3AWSVolume) String() string {
+       return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
+}
+
+// GetDeviceID returns a globally unique ID for the storage bucket.
+func (v *S3AWSVolume) GetDeviceID() string {
+       return "s3://" + v.Endpoint + "/" + v.Bucket
+}
+
+// Compare the given data with the stored data.
+func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+       errChan := make(chan error, 1)
+       go func() {
+               _, err := v.Head("recent/" + loc)
+               errChan <- err
+       }()
+       var err error
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       case err = <-errChan:
+       }
+       if err != nil {
+               // Checking for "loc" itself here would interfere with
+               // future GET requests.
+               //
+               // On AWS, if X doesn't exist, a HEAD or GET request
+               // for X causes X's non-existence to be cached. Thus,
+               // if we test for X, then create X and return a
+               // signature to our client, the client might still get
+               // 404 from all keepstores when trying to read it.
+               //
+               // To avoid this, we avoid doing HEAD X or GET X until
+               // we know X has been written.
+               //
+               // Note that X might exist even though recent/X
+               // doesn't: for example, the response to HEAD recent/X
+               // might itself come from a stale cache. In such
+               // cases, we will return a false negative and
+               // PutHandler might needlessly create another replica
+               // on a different volume. That's not ideal, but it's
+               // better than passing the eventually-consistent
+               // problem on to our clients.
+               return v.translateError(err)
+       }
+
+       input := &s3.GetObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       }
+
+       req := v.bucket.svc.GetObjectRequest(input)
+       result, err := req.Send(ctx)
+       if err != nil {
+               return v.translateError(err)
+       }
+       return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
+}
+
+// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
+// and deletes them from the volume.
+func (v *S3AWSVolume) EmptyTrash() {
+       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+               return
+       }
+
+       var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+       // Define "ready to delete" as "...when EmptyTrash started".
+       startT := time.Now()
+
+       emptyOneKey := func(trash *s3.Object) {
+               v.logger.Warnf("EmptyTrash: looking for trash marker %s with last modified date %s", *trash.Key, *trash.LastModified)
+               loc := strings.TrimPrefix(*trash.Key, "trash/")
+               if !v.isKeepBlock(loc) {
+                       return
+               }
+               atomic.AddInt64(&bytesInTrash, *trash.Size)
+               atomic.AddInt64(&blocksInTrash, 1)
+
+               trashT := *(trash.LastModified)
+               v.logger.Infof("HEEEEEEE trashT key: %s, type: %T val: %s, startT is %s", *trash.Key, trashT, trashT, startT)
+               recent, err := v.Head("recent/" + loc)
+               if err != nil && os.IsNotExist(v.translateError(err)) {
+                       v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+                       err = v.Untrash(loc)
+                       if err != nil {
+                               v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
+                       }
+                       return
+               } else if err != nil {
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+                       return
+               }
+               v.logger.Infof("recent.LastModified type: %T val: %s", recent.LastModified, recent.LastModified)
+               if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
+                       v.logger.Infof("HERE! recent.lastmodified is smaller than blobsigningttl")
+                       if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
+                               // recent/loc is too old to protect
+                               // loc from being Trashed again during
+                               // the raceWindow that starts if we
+                               // delete trash/X now.
+                               //
+                               // Note this means (TrashSweepInterval
+                               // < BlobSigningTTL - raceWindow) is
+                               // necessary to avoid starvation.
+                               v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
+                               v.fixRace(loc)
+                               v.Touch(loc)
+                               return
+                       }
+                       _, err := v.Head(loc)
+                       if os.IsNotExist(err) {
+                               v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
+                               v.fixRace(loc)
+                               return
+                       } else if err != nil {
+                               v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+                               return
+                       }
+               }
+               if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
+                       v.logger.Infof("HERE! trashT for %s is smaller than blobtrashlifetime: %s < %s", *trash.Key, startT.Sub(trashT), v.cluster.Collections.BlobTrashLifetime.Duration())
+                       return
+               }
+               err = v.bucket.Del(*trash.Key)
+               if err != nil {
+                       v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
+                       return
+               }
+               atomic.AddInt64(&bytesDeleted, *trash.Size)
+               atomic.AddInt64(&blocksDeleted, 1)
+
+               v.logger.Infof("HERE! trash.Key %s should have been deleted", *trash.Key)
+               _, err = v.Head(loc)
+               if err == nil {
+                       v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+                       return
+               }
+               if !os.IsNotExist(v.translateError(err)) {
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+                       return
+               }
+               err = v.bucket.Del("recent/" + loc)
+               if err != nil {
+                       v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+               }
+               v.logger.Infof("HERE! recent/%s should have been deleted", loc)
+       }
+
+       var wg sync.WaitGroup
+       todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+       for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for key := range todo {
+                               emptyOneKey(key)
+                       }
+               }()
+       }
+
+       trashL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   "trash/",
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+               todo <- trash
+       }
+       close(todo)
+       wg.Wait()
+
+       if err := trashL.Error(); err != nil {
+               v.logger.WithError(err).Error("EmptyTrash: lister failed")
+       }
+       v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3AWSVolume) fixRace(loc string) bool {
+       trash, err := v.Head("trash/" + loc)
+       if err != nil {
+               if !os.IsNotExist(v.translateError(err)) {
+                       v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+               }
+               return false
+       }
+
+       recent, err := v.Head("recent/" + loc)
+       if err != nil {
+               v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+               return false
+       }
+
+       recentTime := *recent.LastModified
+       trashTime := *trash.LastModified
+       ageWhenTrashed := trashTime.Sub(recentTime)
+       if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
+               // No evidence of a race: block hasn't been written
+               // since it became eligible for Trash. No fix needed.
+               return false
+       }
+
+       v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+       v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+       err = v.safeCopy(loc, "trash/"+loc)
+       if err != nil {
+               v.logger.WithError(err).Error("fixRace: copy failed")
+               return false
+       }
+       return true
+}
+
+func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+       input := &s3.HeadObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       }
+
+       req := v.bucket.svc.HeadObjectRequest(input)
+       res, err := req.Send(context.TODO())
+
+       v.bucket.stats.TickOps("head")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
+       v.bucket.stats.TickErr(err)
+
+       if err != nil {
+               return nil, v.translateError(err)
+       }
+       result = res.HeadObjectOutput
+       return
+}
+
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
+func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       return getWithPipe(ctx, loc, buf, v)
+}
+
+func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+       buf := make([]byte, 0, 67108864)
+       awsBuf := aws.NewWriteAtBuffer(buf)
+
+       downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 13
+       })
+
+       v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
+
+       _, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+       })
+       v.bucket.stats.TickOps("get")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
+       v.bucket.stats.TickErr(err)
+       if err != nil {
+               return nil, v.translateError(err)
+       }
+       buf = awsBuf.Bytes()
+
+       rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes)
+       return
+}
+
+// ReadBlock implements BlockReader.
+func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
+       rdr, err := v.readWorker(ctx, loc)
+
+       if err == nil {
+               _, err2 := io.Copy(w, rdr)
+               if err2 != nil {
+                       return err2
+               }
+               return err
+       }
+
+       err = v.translateError(err)
+       if !os.IsNotExist(err) {
+               return err
+       }
+
+       _, err = v.Head("recent/" + loc)
+       err = v.translateError(err)
+       if err != nil {
+               // If we can't read recent/X, there's no point in
+               // trying fixRace. Give up.
+               return err
+       }
+       if !v.fixRace(loc) {
+               err = os.ErrNotExist
+               return err
+       }
+
+       rdr, err = v.readWorker(ctx, loc)
+       if err != nil {
+               v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+               err = v.translateError(err)
+               return err
+       }
+
+       _, err = io.Copy(w, rdr)
+
+       return err
+}
+
+func (b *s3AWSbucket) PutReader(path string, r io.Reader, length int64, contType string, contentMD5 string, contentSHA256 string) error {
+       if length == 0 {
+               // aws-sdk-go will only send Content-Length: 0 when reader
+               // is nil due to net.http.Request.ContentLength
+               // behavior.  Otherwise, Content-Length header is
+               // omitted which will cause some S3 services
+               // (including AWS and Ceph RadosGW) to fail to create
+               // empty objects.
+               r = bytes.NewReader([]byte{})
+       } else {
+               r = NewCountingReader(r, b.stats.TickOutBytes)
+       }
+       uploader := s3manager.NewUploaderWithClient(b.svc)
+       _, err := uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(b.bucket),
+               Key:    aws.String(path),
+               Body:   r,
+       })
+
+       b.stats.TickOps("put")
+       b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
+       b.stats.TickErr(err)
+       return err
+}
+
+// Put writes a block.
+func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
+       return putWithPipe(ctx, loc, block, v)
+}
+
+// WriteBlock implements BlockWriter.
+func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+
+       r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+       uploadInput := s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+               Body:   r,
+       }
+
+       //var contentMD5, contentSHA256 string
+       var contentMD5 string
+       md5, err := hex.DecodeString(loc)
+       if err != nil {
+               return err
+       }
+       contentMD5 = base64.StdEncoding.EncodeToString(md5)
+       // See if this is the empty block
+       if contentMD5 != "d41d8cd98f00b204e9800998ecf8427e" {
+               uploadInput.ContentMD5 = &contentMD5
+               // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
+               // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
+               // block, so there is no extra memory use to be concerned about. See
+               // makeSha256Reader in aws/signer/v4/v4.go.
+       }
+
+       // Some experimentation indicated that using concurrency 5 yields the best
+       // throughput, better than higher concurrency (10 or 13) by ~5%.
+       // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
+       // is detrimental to througput (minus ~15%).
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 5
+       })
+
+       _, err = uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions())
+
+       v.bucket.stats.TickOps("put")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+       v.bucket.stats.TickErr(err)
+       if err != nil {
+               return err
+       }
+
+       empty := bytes.NewReader([]byte{})
+       _, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + loc),
+               Body:   empty,
+       })
+       v.bucket.stats.TickOps("put")
+       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+       v.bucket.stats.TickErr(err)
+
+       return err
+}
+
+type s3awsLister struct {
+       Logger            logrus.FieldLogger
+       Bucket            *s3AWSbucket //*s3.Bucket
+       Prefix            string
+       PageSize          int
+       Stats             *s3awsbucketStats
+       ContinuationToken string
+       buf               []s3.Object
+       err               error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3awsLister) First() *s3.Object {
+       lister.getPage()
+       return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3awsLister) Next() *s3.Object {
+       if len(lister.buf) == 0 && lister.ContinuationToken != "" {
+               lister.getPage()
+       }
+       return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3awsLister) Error() error {
+       return lister.err
+}
+
+func (lister *s3awsLister) getPage() {
+       lister.Stats.TickOps("list")
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
+
+       var input *s3.ListObjectsV2Input
+       if lister.ContinuationToken == "" {
+               input = &s3.ListObjectsV2Input{
+                       Bucket:  aws.String(lister.Bucket.bucket),
+                       MaxKeys: aws.Int64(int64(lister.PageSize)),
+                       Prefix:  aws.String(lister.Prefix),
+               }
+       } else {
+               input = &s3.ListObjectsV2Input{
+                       Bucket:            aws.String(lister.Bucket.bucket),
+                       MaxKeys:           aws.Int64(int64(lister.PageSize)),
+                       Prefix:            aws.String(lister.Prefix),
+                       ContinuationToken: &lister.ContinuationToken,
+               }
+       }
+
+       req := lister.Bucket.svc.ListObjectsV2Request(input)
+       resp, err := req.Send(context.Background())
+       if err != nil {
+               if aerr, ok := err.(awserr.Error); ok {
+                       lister.err = aerr
+               } else {
+                       lister.err = err
+               }
+               return
+       }
+
+       if *resp.IsTruncated {
+               lister.ContinuationToken = *resp.NextContinuationToken
+       } else {
+               lister.ContinuationToken = ""
+       }
+       lister.buf = make([]s3.Object, 0, len(resp.Contents))
+       for _, key := range resp.Contents {
+               if !strings.HasPrefix(*key.Key, lister.Prefix) {
+                       lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
+                       continue
+               }
+               lister.buf = append(lister.buf, key)
+       }
+}
+
+func (lister *s3awsLister) pop() (k *s3.Object) {
+       if len(lister.buf) > 0 {
+               k = &lister.buf[0]
+               lister.buf = lister.buf[1:]
+       }
+       return
+}
+
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
+func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+       // Use a merge sort to find matching sets of X and recent/X.
+       dataL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   prefix,
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       recentL := s3awsLister{
+               Logger:   v.logger,
+               Bucket:   v.bucket,
+               Prefix:   "recent/" + prefix,
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
+               if *data.Key >= "g" {
+                       // Conveniently, "recent/*" and "trash/*" are
+                       // lexically greater than all hex-encoded data
+                       // hashes, so stopping here avoids iterating
+                       // over all of them needlessly with dataL.
+                       break
+               }
+               if !v.isKeepBlock(*data.Key) {
+                       continue
+               }
+
+               // stamp is the list entry we should use to report the
+               // last-modified time for this data block: it will be
+               // the recent/X entry if one exists, otherwise the
+               // entry for the data block itself.
+               stamp := data
+
+               // Advance to the corresponding recent/X marker, if any
+               for recent != nil && recentL.Error() == nil {
+                       if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
+                               recent = recentL.Next()
+                               continue
+                       } else if cmp == 0 {
+                               stamp = recent
+                               recent = recentL.Next()
+                               break
+                       } else {
+                               // recent/X marker is missing: we'll
+                               // use the timestamp on the data
+                               // object.
+                               break
+                       }
+               }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
+               fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.UnixNano())
+       }
+       return dataL.Error()
+}
+
+// Mtime returns the stored timestamp for the given locator.
+func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+       _, err := v.Head(loc)
+       if err != nil {
+               return s3AWSZeroTime, v.translateError(err)
+       }
+       resp, err := v.Head("recent/" + loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               // The data object X exists, but recent/X is missing.
+               err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+               if err != nil {
+                       v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+                       return s3AWSZeroTime, v.translateError(err)
+               }
+               v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
+               resp, err = v.Head("recent/" + loc)
+               if err != nil {
+                       v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+                       return s3AWSZeroTime, v.translateError(err)
+               }
+       } else if err != nil {
+               // HEAD recent/X failed for some other reason.
+               return s3AWSZeroTime, err
+       }
+       return *resp.LastModified, err
+}
+
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
+func (v *S3AWSVolume) Status() *VolumeStatus {
+       return &VolumeStatus{
+               DeviceNum: 1,
+               BytesFree: BlockSize * 1000,
+               BytesUsed: 1,
+       }
+}
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3AWSVolume) InternalStats() interface{} {
+       return &v.bucket.stats
+}
+
+// Touch sets the timestamp for the given locator to the current time.
+func (v *S3AWSVolume) Touch(loc string) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+       _, err := v.Head(loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) && v.fixRace(loc) {
+               // The data object got trashed in a race, but fixRace
+               // rescued it.
+       } else if err != nil {
+               return err
+       }
+       err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+       return v.translateError(err)
+}
+
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3AWSVolume) checkRaceWindow(loc string) error {
+       resp, err := v.Head("trash/" + loc)
+       err = v.translateError(err)
+       if os.IsNotExist(err) {
+               // OK, trash/X doesn't exist so we're not in the race
+               // window
+               return nil
+       } else if err != nil {
+               // Error looking up trash/X. We don't know whether
+               // we're in the race window
+               return err
+       }
+       t := resp.LastModified
+       safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
+       if safeWindow <= 0 {
+               // We can't count on "touch trash/X" to prolong
+               // trash/X's lifetime. The new timestamp might not
+               // become visible until now+raceWindow, and EmptyTrash
+               // is allowed to delete trash/X before then.
+               return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+       }
+       // trash/X exists, but it won't be eligible for deletion until
+       // after now+raceWindow, so it's safe to overwrite it.
+       return nil
+}
+
+func (b *s3AWSbucket) Del(path string) error {
+       input := &s3.DeleteObjectInput{
+               Bucket: aws.String(b.bucket),
+               Key:    aws.String(path),
+       }
+       req := b.svc.DeleteObjectRequest(input)
+       _, err := req.Send(context.Background())
+       //err := b.Bucket().Del(path)
+       b.stats.TickOps("delete")
+       b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
+       b.stats.TickErr(err)
+       return err
+}
+
+// Trash a Keep block.
+func (v *S3AWSVolume) Trash(loc string) error {
+       if v.volume.ReadOnly {
+               return MethodDisabledError
+       }
+       if t, err := v.Mtime(loc); err != nil {
+               return err
+       } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
+               return nil
+       }
+       if v.cluster.Collections.BlobTrashLifetime == 0 {
+               if !v.UnsafeDelete {
+                       return ErrS3TrashDisabled
+               }
+               return v.translateError(v.bucket.Del(loc))
+       }
+       err := v.checkRaceWindow(loc)
+       if err != nil {
+               return err
+       }
+       err = v.safeCopy("trash/"+loc, loc)
+       if err != nil {
+               return err
+       }
+       return v.translateError(v.bucket.Del(loc))
+}
+
+// Untrash moves block from trash back into store
+func (v *S3AWSVolume) Untrash(loc string) error {
+       err := v.safeCopy(loc, "trash/"+loc)
+       if err != nil {
+               return err
+       }
+       err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+       return v.translateError(err)
+}
+
+type s3awsbucketStats struct {
+       statsTicker
+       Ops     uint64
+       GetOps  uint64
+       PutOps  uint64
+       HeadOps uint64
+       DelOps  uint64
+       ListOps uint64
+}
+
+func (s *s3awsbucketStats) TickErr(err error) {
+       if err == nil {
+               return
+       }
+       errType := fmt.Sprintf("%T", err)
+       if aerr, ok := err.(awserr.Error); ok {
+               if reqErr, ok := err.(awserr.RequestFailure); ok {
+                       // A service error occurred
+                       errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+               } else {
+                       errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+               }
+       }
+       s.statsTicker.TickErr(err, errType)
+}
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
new file mode 100644 (file)
index 0000000..46fe07d
--- /dev/null
@@ -0,0 +1,657 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "bytes"
+       "context"
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "strings"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/service/s3"
+       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+
+       "github.com/johannesboyne/gofakes3"
+       "github.com/johannesboyne/gofakes3/backend/s3mem"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+const (
+       S3AWSTestBucketName = "testbucket"
+)
+
+type s3AWSFakeClock struct {
+       now *time.Time
+}
+
+func (c *s3AWSFakeClock) Now() time.Time {
+       if c.now == nil {
+               return time.Now()
+       }
+       return *c.now
+}
+
+func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+       return c.Now().Sub(t)
+}
+
+var _ = check.Suite(&StubbedS3AWSSuite{})
+
+var srv httptest.Server
+
+type StubbedS3AWSSuite struct {
+       s3server *httptest.Server
+       metadata *httptest.Server
+       cluster  *arvados.Cluster
+       handler  *handler
+       volumes  []*TestableS3AWSVolume
+}
+
+func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+       s.s3server = nil
+       s.metadata = nil
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
+               "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
+       }
+       s.handler = &handler{}
+}
+
+func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               // Use a negative raceWindow so s3test's 1-second
+               // timestamp precision doesn't confuse fixRace.
+               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+       DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+       v.IndexPageSize = 3
+       for i := 0; i < 256; i++ {
+               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+       }
+       for _, spec := range []struct {
+               prefix      string
+               expectMatch int
+       }{
+               {"", 256},
+               {"c", 16},
+               {"bc", 1},
+               {"abc", 0},
+       } {
+               buf := new(bytes.Buffer)
+               err := v.IndexTo(spec.prefix, buf)
+               c.Check(err, check.IsNil)
+
+               idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
+               c.Check(len(idx), check.Equals, spec.expectMatch+1)
+               c.Check(len(idx[len(idx)-1]), check.Equals, 0)
+       }
+}
+
+func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+       // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+       vol := S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKey: "xxx",
+                       SecretKey: "xxx",
+                       Endpoint:  stub.URL,
+                       Region:    "test-region-1",
+                       Bucket:    "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := vol.check("")
+       // Our test S3 server uses the older 'Path Style'
+       vol.bucket.svc.ForcePathStyle = true
+
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+               exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+               // Literal example from
+               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+               // but with updated timestamps
+               io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+       }))
+       defer s.metadata.Close()
+
+       v := &S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/latest/api/token",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := v.check(s.metadata.URL + "/latest")
+       creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       fmt.Printf("%+v, %s\n", creds, err)
+       c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusNotFound)
+       }))
+       deadv := &S3AWSVolume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err = deadv.check(s.metadata.URL + "/latest")
+       _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+       c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+       c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+}
+
+func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       stats := func() string {
+               buf, err := json.Marshal(v.InternalStats())
+               c.Check(err, check.IsNil)
+               return string(buf)
+       }
+
+       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.NotNil)
+       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+       err = v.Put(context.Background(), loc, []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+       c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
+type s3AWSBlockingHandler struct {
+       requested chan *http.Request
+       unblock   chan struct{}
+}
+
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
+               // Accept PutBucket ("PUT /bucketname/"), called by
+               // newTestableVolume
+               return
+       }
+       if h.requested != nil {
+               h.requested <- r
+       }
+       if h.unblock != nil {
+               <-h.unblock
+       }
+       http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := make([]byte, 3)
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               _, err := v.Get(ctx, loc, buf)
+               return err
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("bar")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               return v.Compare(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("foo")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+               return v.Put(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+       handler := &s3AWSBlockingHandler{}
+       s.s3server = httptest.NewServer(handler)
+       defer s.s3server.Close()
+
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+
+       ctx, cancel := context.WithCancel(context.Background())
+
+       handler.requested = make(chan *http.Request)
+       handler.unblock = make(chan struct{})
+       defer close(handler.unblock)
+
+       doneFunc := make(chan struct{})
+       go func() {
+               err := testFunc(ctx, v)
+               c.Check(err, check.Equals, context.Canceled)
+               close(doneFunc)
+       }()
+
+       timeout := time.After(10 * time.Second)
+
+       // Wait for the stub server to receive a request, meaning
+       // Get() is waiting for an s3 operation.
+       select {
+       case <-timeout:
+               c.Fatal("timed out waiting for test func to call our handler")
+       case <-doneFunc:
+               c.Fatal("test func finished without even calling our handler!")
+       case <-handler.requested:
+       }
+
+       cancel()
+
+       select {
+       case <-timeout:
+               c.Fatal("timed out")
+       case <-doneFunc:
+       }
+}
+
+func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+       s.cluster.Collections.BlobTrashLifetime.Set("1h")
+       s.cluster.Collections.BlobSigningTTL.Set("1h")
+
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       var none time.Time
+
+       putS3Obj := func(t time.Time, key string, data []byte) {
+               if t == none {
+                       return
+               }
+               v.serverClock.now = &t
+               fmt.Printf("USING TIMESTAMP %s to write key %s", t, key)
+               uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+               resp, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+                       Bucket: aws.String(v.bucket.bucket),
+                       Key:    aws.String(key),
+                       Body:   bytes.NewReader(data),
+               })
+               if err != nil {
+                       panic(err)
+               }
+               fmt.Println(resp)
+               v.serverClock.now = nil
+               resp2, err := v.Head(key)
+               fmt.Printf("KEY: %s\n%s", key, resp2)
+       }
+
+       t0 := time.Now()
+       nextKey := 0
+       for _, scenario := range []struct {
+               label               string
+               dataT               time.Time
+               recentT             time.Time
+               trashT              time.Time
+               canGet              bool
+               canTrash            bool
+               canGetAfterTrash    bool
+               canUntrash          bool
+               haveTrashAfterEmpty bool
+               freshAfterEmpty     bool
+       }{
+               {
+                       "No related objects",
+                       none, none, none,
+                       false, false, false, false, false, false,
+               },
+               {
+                       // Stored by older version, or there was a
+                       // race between EmptyTrash and Put: Trash is a
+                       // no-op even though the data object is very
+                       // old
+                       "No recent/X",
+                       t0.Add(-48 * time.Hour), none, none,
+                       true, true, true, false, false, false,
+               },
+               {
+                       "Not trash, but old enough to be eligible for trash",
+                       t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+                       true, true, false, false, false, false,
+               },
+               {
+                       "Not trash, and not old enough to be eligible for trash",
+                       t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+                       true, true, true, false, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, due to recent race between Trash and Put",
+                       t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+                       true, true, true, true, true, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+                       true, false, true, true, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
+                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+                       true, false, true, true, true, true,
+               },
+               {
+                       "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
+                       t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+                       true, true, true, true, false, false,
+               },
+               {
+                       "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
+                       t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+                       true, false, true, true, false, false,
+               },
+               {
+                       "Trash, not yet eligible for deletion",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+                       false, false, false, true, true, false,
+               },
+               {
+                       "Trash, not yet eligible for deletion, prone to races",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+                       false, false, false, true, true, false,
+               },
+               {
+                       "Trash, eligible for deletion",
+                       none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+                       false, false, false, true, false, false,
+               },
+               {
+                       "Erroneously trashed during a race, detected before BlobTrashLifetime",
+                       none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
+                       none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+                       true, false, true, true, true, false,
+               },
+               {
+                       "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+                       none, none, t0.Add(-time.Minute),
+                       false, false, false, true, true, true,
+               },
+       } {
+               c.Log("Scenario: ", scenario.label)
+
+               // We have a few tests to run for each scenario, and
+               // the tests are expected to change state. By calling
+               // this setup func between tests, we (re)create the
+               // scenario as specified, using a new unique block
+               // locator to prevent interference from previous
+               // tests.
+
+               setupScenario := func() (string, []byte) {
+                       nextKey++
+                       blk := []byte(fmt.Sprintf("%d", nextKey))
+                       loc := fmt.Sprintf("%x", md5.Sum(blk))
+                       c.Log("\t", loc)
+                       putS3Obj(scenario.dataT, loc, blk)
+                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
+                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
+                       v.serverClock.now = &t0
+                       return loc, blk
+               }
+
+               // Check canGet
+               loc, blk := setupScenario()
+               buf := make([]byte, len(blk))
+               _, err := v.Get(context.Background(), loc, buf)
+               c.Check(err == nil, check.Equals, scenario.canGet)
+               if err != nil {
+                       c.Check(os.IsNotExist(err), check.Equals, true)
+               }
+
+               // Call Trash, then check canTrash and canGetAfterTrash
+               loc, _ = setupScenario()
+               err = v.Trash(loc)
+               c.Check(err == nil, check.Equals, scenario.canTrash)
+               _, err = v.Get(context.Background(), loc, buf)
+               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+               if err != nil {
+                       c.Check(os.IsNotExist(err), check.Equals, true)
+               }
+
+               // Call Untrash, then check canUntrash
+               loc, _ = setupScenario()
+               err = v.Untrash(loc)
+               c.Check(err == nil, check.Equals, scenario.canUntrash)
+               if scenario.dataT != none || scenario.trashT != none {
+                       // In all scenarios where the data exists, we
+                       // should be able to Get after Untrash --
+                       // regardless of timestamps, errors, race
+                       // conditions, etc.
+                       _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err, check.IsNil)
+               }
+
+               // Call EmptyTrash, then check haveTrashAfterEmpty and
+               // freshAfterEmpty
+               loc, _ = setupScenario()
+               v.EmptyTrash()
+               _, err = v.Head("trash/" + loc)
+               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+               if scenario.freshAfterEmpty {
+                       t, err := v.Mtime(loc)
+                       c.Check(err, check.IsNil)
+                       // new mtime must be current (with an
+                       // allowance for 1s timestamp precision)
+                       c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+               }
+
+               // Check for current Mtime after Put (applies to all
+               // scenarios)
+               loc, blk = setupScenario()
+               err = v.Put(context.Background(), loc, blk)
+               c.Check(err, check.IsNil)
+               t, err := v.Mtime(loc)
+               c.Check(err, check.IsNil)
+               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+       }
+}
+
+type TestableS3AWSVolume struct {
+       *S3AWSVolume
+       server      *httptest.Server
+       c           *check.C
+       serverClock *s3AWSFakeClock
+}
+
+type LogrusLog struct {
+       log *logrus.FieldLogger
+}
+
+func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+       switch level {
+       case gofakes3.LogErr:
+               (*l.log).Errorln(v...)
+       case gofakes3.LogWarn:
+               (*l.log).Warnln(v...)
+       case gofakes3.LogInfo:
+               (*l.log).Infoln(v...)
+       default:
+               panic("unknown level")
+       }
+}
+
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+
+       clock := &s3AWSFakeClock{}
+       // fake s3
+       backend := s3mem.New(s3mem.WithTimeSource(clock))
+
+       logger := new(LogrusLog)
+       ctxLogger := ctxlog.FromContext(context.Background())
+       logger.log = &ctxLogger
+       faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(logger), gofakes3.WithTimeSkewLimit(0))
+       srv := httptest.NewServer(faker.Server())
+
+       endpoint := srv.URL
+       if s.s3server != nil {
+               endpoint = s.s3server.URL
+       }
+
+       iamRole, accessKey, secretKey := "", "xxx", "xxx"
+       if s.metadata != nil {
+               iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+       }
+
+       v := &TestableS3AWSVolume{
+               S3AWSVolume: &S3AWSVolume{
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               IAMRole:            iamRole,
+                               AccessKey:          accessKey,
+                               SecretKey:          secretKey,
+                               Bucket:             S3AWSTestBucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster: cluster,
+                       volume:  volume,
+                       logger:  ctxlog.TestLogger(c),
+                       metrics: metrics,
+               },
+               c:           c,
+               server:      srv,
+               serverClock: clock,
+       }
+       c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+       // Our test S3 server uses the older 'Path Style'
+       v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+       // Create the testbucket
+       input := &s3.CreateBucketInput{
+               Bucket: aws.String(S3AWSTestBucketName),
+       }
+       req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+       _, err := req.Send(context.Background())
+       c.Assert(err, check.IsNil)
+       // We couldn't set RaceWindow until now because check()
+       // rejects negative values.
+       v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+       return v
+}
+
+// PutRaw skips the ContentMD5 test
+func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+
+       r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+               u.PartSize = 5 * 1024 * 1024
+               u.Concurrency = 13
+       })
+
+       _, err := uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String(loc),
+               Body:   r,
+       })
+       if err != nil {
+               v.logger.Printf("PutRaw: %s: %+v", loc, err)
+       }
+
+       empty := bytes.NewReader([]byte{})
+       _, err = uploader.Upload(&s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + loc),
+               Body:   empty,
+       })
+       if err != nil {
+               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+       }
+}
+
+// TouchWithDate turns back the clock while doing a Touch(). We assume
+// there are no other operations happening on the same s3test server
+// while we do this.
+func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+       v.serverClock.now = &lastPut
+
+       uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+       empty := bytes.NewReader([]byte{})
+       _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+               Bucket: aws.String(v.bucket.bucket),
+               Key:    aws.String("recent/" + locator),
+               Body:   empty,
+       })
+       if err != nil {
+               panic(err)
+       }
+
+       v.serverClock.now = nil
+}
+
+func (v *TestableS3AWSVolume) Teardown() {
+       v.server.Close()
+}
+
+func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "put"
+}