# might be needed for other S3-compatible services.
V2Signature: false
+ # Use the AWS S3 v2 Go driver instead of the goamz driver.
+ UseAWSS3v2Driver: false
+
# Requested page size for "list bucket contents" requests.
IndexPageSize: 1000
# classes" in the "Admin" section of doc.arvados.org.
StorageClasses: null
</code></pre></notextile>
+
+Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @UseAWSS3v2Driver@ flag to @true@.
+
+The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
+
+The @aws-sdk-go-v2@ driver can improve read performance by 50-100% over the @goamz@ driver, but it has not had as much production use. See the "wiki":https://dev.arvados.org/projects/arvados/wiki/Keep_real_world_performance_numbers for details.
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/arvados/cgofuse v1.2.0-arvados1
github.com/aws/aws-sdk-go v1.25.30
+ github.com/aws/aws-sdk-go-v2 v0.23.0
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
github.com/coreos/go-oidc v2.1.0+incompatible
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmcvetta/randutil v0.0.0-20150817122601-2bb1b664bcff
github.com/jmoiron/sqlx v1.2.0
+ github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc
github.com/julienschmidt/httprouter v1.2.0
github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7 // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
github.com/stretchr/testify v1.4.0 // indirect
github.com/xanzy/ssh-agent v0.1.0 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
- golang.org/x/net v0.0.0-20190620200207-3b0461eec859
+ golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
google.golang.org/api v0.13.0
github.com/arvados/cgofuse v1.2.0-arvados1/go.mod h1:79WFV98hrkRHK9XPhh2IGGOwpFSjocsWubgxAs2KhRc=
github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef h1:cl7DIRbiAYNqaVxg3CZY8qfZoBOKrj06H/x9SPGaxas=
github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef/go.mod h1:rCtgyMmBGEbjTm37fCuBYbNL0IhztiALzo3OB9HyiOM=
+github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
+github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092 h1:0Di2onNnlN5PAyWPbqlPyN45eOQ+QW/J9eqLynt4IV4=
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092/go.mod h1:8IzBjZCRSnsvM6MJMG8HNNtnzMl48H22rbJL2kRUJ0Y=
github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
-github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
+github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc/go.mod h1:fNiSoOiEI5KlkWXn26OwKnNe58ilTIkpBlgOrt7Olu8=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5 h1:Jw7W4WMfQDxsXvfeFSaS2cHlY7bAF4MGrgnbd0+Uo78=
github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
+github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63/go.mod h1:n+VKSARF5y/tS9XFSP7vWDfS+GUC5vs/YT7M5XDTUEM=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/src-d/gcfg v1.3.0 h1:2BEDr8r0I0b8h/fOqwtxCEiq2HJu8n2JGZJQFGXWLjg=
github.com/src-d/gcfg v1.3.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190310074541-c10a0554eabf/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190310054646-10058d7d4faa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190308174544-00c44ba9c14f/go.mod h1:25r3+/G6/xytQM8iWZKq3Hn0kr0rgFKPUNVEL/dr3z4=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c h1:97SnQk1GYRXJgvwZ8fadnxDOWfKvkNQHH3CtZntPSrM=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/src-d/go-billy.v4 v4.0.1 h1:iMxwQPj2cuKRyaIZ985zxClkcdTtT5VpXYf4PTJc0Ek=
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ # Use aws-s3-go (v2) instead of goamz
+ UseAWSS3v2Driver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ # Use aws-s3-go (v2) instead of goamz
+ UseAWSS3v2Driver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
Bucket string
LocationConstraint bool
V2Signature bool
+ UseAWSS3v2Driver bool
IndexPageSize int
ConnectTimeout Duration
ReadTimeout Duration
return err
}
*ss = make(map[string]struct{}, len(hash))
- for t, _ := range hash {
+ for t := range hash {
(*ss)[t] = struct{}{}
}
)
func init() {
- driver["S3"] = newS3Volume
+ driver["S3"] = chooseS3VolumeDriver
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+ err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/awserr"
+ "github.com/aws/aws-sdk-go-v2/aws/defaults"
+ "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
+ "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
+ "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+)
+
+// S3Volume implements Volume using an S3 bucket.
+type S3AWSVolume struct {
+ arvados.S3VolumeDriverParameters
+ AuthToken string // populated automatically when IAMRole is used
+ AuthExpiration time.Time // populated automatically when IAMRole is used
+
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bucket *s3AWSbucket
+ region string
+ startOnce sync.Once
+}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be replaced atomically with SetBucket in order
+// to update credentials.
+type s3AWSbucket struct {
+ bucket string
+ svc *s3.Client
+ stats s3awsbucketStats
+ mu sync.Mutex
+}
+
+// chooseS3VolumeDriver distinguishes between the old goamz driver and
+// aws-sdk-go based on the UseAWSS3v2Driver feature flag
+func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, v)
+ if err != nil {
+ return nil, err
+ }
+ if v.UseAWSS3v2Driver {
+ logger.Debugln("Using AWS S3 v2 driver")
+ return newS3AWSVolume(cluster, volume, logger, metrics)
+ } else {
+ logger.Debugln("Using goamz S3 driver")
+ return newS3Volume(cluster, volume, logger, metrics)
+ }
+}
+
+const (
+ PartSize = 5 * 1024 * 1024
+ ReadConcurrency = 13
+ WriteConcurrency = 5
+)
+
+var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+var s3AWSZeroTime time.Time
+
+func (v *S3AWSVolume) isKeepBlock(s string) bool {
+ return s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+ v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, v)
+ if err != nil {
+ return nil, err
+ }
+ v.logger = logger.WithField("Volume", v.String())
+ return v, v.check("")
+}
+
+func (v *S3AWSVolume) translateError(err error) error {
+ if aerr, ok := err.(awserr.Error); ok {
+ switch aerr.Code() {
+ case "NotFound":
+ return os.ErrNotExist
+ case "NoSuchKey":
+ return os.ErrNotExist
+ }
+ }
+ return err
+}
+
+// safeCopy calls CopyObjectRequest, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+//
+// (If something goes wrong during the copy, the error will be embedded in the
+// 200 OK response)
+func (v *S3AWSVolume) safeCopy(dst, src string) error {
+ input := &s3.CopyObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ ContentType: aws.String("application/octet-stream"),
+ CopySource: aws.String(v.bucket.bucket + "/" + src),
+ Key: aws.String(dst),
+ }
+
+ req := v.bucket.svc.CopyObjectRequest(input)
+ resp, err := req.Send(context.Background())
+
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ return err
+ } else if err != nil {
+ return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
+ }
+
+ if resp.CopyObjectResult.LastModified == nil {
+ return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
+ } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
+ return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+ }
+ return nil
+}
+
+func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+ if v.Bucket == "" {
+ return errors.New("DriverParameters: Bucket must be provided")
+ }
+ if v.IndexPageSize == 0 {
+ v.IndexPageSize = 1000
+ }
+ if v.RaceWindow < 0 {
+ return errors.New("DriverParameters: RaceWindow must not be negative")
+ }
+
+ if v.V2Signature {
+ return errors.New("DriverParameters: V2Signature is not supported")
+ }
+
+ defaultResolver := endpoints.NewDefaultResolver()
+
+ cfg := defaults.Config()
+
+ if v.Endpoint == "" && v.Region == "" {
+ return fmt.Errorf("AWS region or endpoint must be specified")
+ } else if v.Endpoint != "" || ec2metadataHostname != "" {
+ myCustomResolver := func(service, region string) (aws.Endpoint, error) {
+ if v.Endpoint != "" && service == "s3" {
+ return aws.Endpoint{
+ URL: v.Endpoint,
+ SigningRegion: v.Region,
+ }, nil
+ } else if service == "ec2metadata" && ec2metadataHostname != "" {
+ return aws.Endpoint{
+ URL: ec2metadataHostname,
+ }, nil
+ }
+
+ return defaultResolver.ResolveEndpoint(service, region)
+ }
+ cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
+ }
+
+ cfg.Region = v.Region
+
+ // Zero timeouts mean "wait forever", which is a bad
+ // default. Default to long timeouts instead.
+ if v.ConnectTimeout == 0 {
+ v.ConnectTimeout = s3DefaultConnectTimeout
+ }
+ if v.ReadTimeout == 0 {
+ v.ReadTimeout = s3DefaultReadTimeout
+ }
+
+ creds := aws.NewChainProvider(
+ []aws.CredentialsProvider{
+ aws.NewStaticCredentialsProvider(v.AccessKey, v.SecretKey, v.AuthToken),
+ ec2rolecreds.New(ec2metadata.New(cfg)),
+ })
+
+ cfg.Credentials = creds
+
+ v.bucket = &s3AWSbucket{
+ bucket: v.Bucket,
+ svc: s3.New(cfg),
+ }
+
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+ v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
+ return nil
+}
+
+// String implements fmt.Stringer.
+func (v *S3AWSVolume) String() string {
+ return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
+}
+
+// GetDeviceID returns a globally unique ID for the storage bucket.
+func (v *S3AWSVolume) GetDeviceID() string {
+ return "s3://" + v.Endpoint + "/" + v.Bucket
+}
+
+// Compare the given data with the stored data.
+func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := v.Head("recent/" + loc)
+ errChan <- err
+ }()
+ var err error
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err = <-errChan:
+ }
+ if err != nil {
+ // Checking for "loc" itself here would interfere with
+ // future GET requests.
+ //
+ // On AWS, if X doesn't exist, a HEAD or GET request
+ // for X causes X's non-existence to be cached. Thus,
+ // if we test for X, then create X and return a
+ // signature to our client, the client might still get
+ // 404 from all keepstores when trying to read it.
+ //
+ // To avoid this, we avoid doing HEAD X or GET X until
+ // we know X has been written.
+ //
+ // Note that X might exist even though recent/X
+ // doesn't: for example, the response to HEAD recent/X
+ // might itself come from a stale cache. In such
+ // cases, we will return a false negative and
+ // PutHandler might needlessly create another replica
+ // on a different volume. That's not ideal, but it's
+ // better than passing the eventually-consistent
+ // problem on to our clients.
+ return v.translateError(err)
+ }
+
+ input := &s3.GetObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ }
+
+ req := v.bucket.svc.GetObjectRequest(input)
+ result, err := req.Send(ctx)
+ if err != nil {
+ return v.translateError(err)
+ }
+ return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
+}
+
+// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
+// and deletes them from the volume.
+func (v *S3AWSVolume) EmptyTrash() {
+ if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+ return
+ }
+
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+ // Define "ready to delete" as "...when EmptyTrash started".
+ startT := time.Now()
+
+ emptyOneKey := func(trash *s3.Object) {
+ loc := strings.TrimPrefix(*trash.Key, "trash/")
+ if !v.isKeepBlock(loc) {
+ return
+ }
+ atomic.AddInt64(&bytesInTrash, *trash.Size)
+ atomic.AddInt64(&blocksInTrash, 1)
+
+ trashT := *(trash.LastModified)
+ recent, err := v.Head("recent/" + loc)
+ if err != nil && os.IsNotExist(v.translateError(err)) {
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+ err = v.Untrash(loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
+ }
+ return
+ } else if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+ return
+ }
+ if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
+ if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (TrashSweepInterval
+ // < BlobSigningTTL - raceWindow) is
+ // necessary to avoid starvation.
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ return
+ }
+ _, err := v.Head(loc)
+ if os.IsNotExist(err) {
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
+ v.fixRace(loc)
+ return
+ } else if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ return
+ }
+ }
+ if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
+ return
+ }
+ err = v.bucket.Del(*trash.Key)
+ if err != nil {
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
+ return
+ }
+ atomic.AddInt64(&bytesDeleted, *trash.Size)
+ atomic.AddInt64(&blocksDeleted, 1)
+
+ _, err = v.Head(loc)
+ if err == nil {
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+ return
+ }
+ if !os.IsNotExist(v.translateError(err)) {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ return
+ }
+ err = v.bucket.Del("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+ }
+ }
+
+ var wg sync.WaitGroup
+ todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+ for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for key := range todo {
+ emptyOneKey(key)
+ }
+ }()
+ }
+
+ trashL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: "trash/",
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ todo <- trash
+ }
+ close(todo)
+ wg.Wait()
+
+ if err := trashL.Error(); err != nil {
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
+ }
+ v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3AWSVolume) fixRace(loc string) bool {
+ trash, err := v.Head("trash/" + loc)
+ if err != nil {
+ if !os.IsNotExist(v.translateError(err)) {
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+ }
+ return false
+ }
+
+ recent, err := v.Head("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+ return false
+ }
+
+ recentTime := *recent.LastModified
+ trashTime := *trash.LastModified
+ ageWhenTrashed := trashTime.Sub(recentTime)
+ if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
+ // No evidence of a race: block hasn't been written
+ // since it became eligible for Trash. No fix needed.
+ return false
+ }
+
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ err = v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ v.logger.WithError(err).Error("fixRace: copy failed")
+ return false
+ }
+ return true
+}
+
+func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+ input := &s3.HeadObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ }
+
+ req := v.bucket.svc.HeadObjectRequest(input)
+ res, err := req.Send(context.TODO())
+
+ v.bucket.stats.TickOps("head")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
+ v.bucket.stats.TickErr(err)
+
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ result = res.HeadObjectOutput
+ return
+}
+
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
+func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ return getWithPipe(ctx, loc, buf, v)
+}
+
+func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+ buf := make([]byte, 0, 67108864)
+ awsBuf := aws.NewWriteAtBuffer(buf)
+
+ downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+ u.PartSize = PartSize
+ u.Concurrency = ReadConcurrency
+ })
+
+ v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
+
+ _, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ })
+ v.bucket.stats.TickOps("get")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
+ v.bucket.stats.TickErr(err)
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ buf = awsBuf.Bytes()
+
+ rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes)
+ return
+}
+
+// ReadBlock implements BlockReader.
+func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
+ rdr, err := v.readWorker(ctx, loc)
+
+ if err == nil {
+ _, err2 := io.Copy(w, rdr)
+ if err2 != nil {
+ return err2
+ }
+ return err
+ }
+
+ err = v.translateError(err)
+ if !os.IsNotExist(err) {
+ return err
+ }
+
+ _, err = v.Head("recent/" + loc)
+ err = v.translateError(err)
+ if err != nil {
+ // If we can't read recent/X, there's no point in
+ // trying fixRace. Give up.
+ return err
+ }
+ if !v.fixRace(loc) {
+ err = os.ErrNotExist
+ return err
+ }
+
+ rdr, err = v.readWorker(ctx, loc)
+ if err != nil {
+ v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+ err = v.translateError(err)
+ return err
+ }
+
+ _, err = io.Copy(w, rdr)
+
+ return err
+}
+
+func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader) error {
+ if r == nil {
+ // r == nil leads to a memory violation in func readFillBuf in
+ // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
+ r = bytes.NewReader(nil)
+ }
+
+ uploadInput := s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(name),
+ Body: r,
+ }
+
+ if len(name) == 32 {
+ var contentMD5 string
+ md5, err := hex.DecodeString(name)
+ if err != nil {
+ return err
+ }
+ contentMD5 = base64.StdEncoding.EncodeToString(md5)
+ uploadInput.ContentMD5 = &contentMD5
+ }
+
+ // Experimentation indicated that using concurrency 5 yields the best
+ // throughput, better than higher concurrency (10 or 13) by ~5%.
+ // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
+ // is detrimental to througput (minus ~15%).
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ u.PartSize = PartSize
+ u.Concurrency = WriteConcurrency
+ })
+
+ // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
+ // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
+ // block, so there is no extra memory use to be concerned about. See
+ // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
+ // calculating the Sha-256 because we don't need it; we already use md5sum
+ // hashes that match the name of the block.
+ _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+ r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
+ }))
+
+ v.bucket.stats.TickOps("put")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+ v.bucket.stats.TickErr(err)
+
+ return err
+}
+
+// Put writes a block.
+func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
+ return putWithPipe(ctx, loc, block, v)
+}
+
+// WriteBlock implements BlockWriter.
+func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+
+ r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+ err := v.writeObject(ctx, loc, r)
+ if err != nil {
+ return err
+ }
+ return v.writeObject(ctx, "recent/"+loc, nil)
+}
+
+type s3awsLister struct {
+ Logger logrus.FieldLogger
+ Bucket *s3AWSbucket
+ Prefix string
+ PageSize int
+ Stats *s3awsbucketStats
+ ContinuationToken string
+ buf []s3.Object
+ err error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3awsLister) First() *s3.Object {
+ lister.getPage()
+ return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3awsLister) Next() *s3.Object {
+ if len(lister.buf) == 0 && lister.ContinuationToken != "" {
+ lister.getPage()
+ }
+ return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3awsLister) Error() error {
+ return lister.err
+}
+
+func (lister *s3awsLister) getPage() {
+ lister.Stats.TickOps("list")
+ lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
+
+ var input *s3.ListObjectsV2Input
+ if lister.ContinuationToken == "" {
+ input = &s3.ListObjectsV2Input{
+ Bucket: aws.String(lister.Bucket.bucket),
+ MaxKeys: aws.Int64(int64(lister.PageSize)),
+ Prefix: aws.String(lister.Prefix),
+ }
+ } else {
+ input = &s3.ListObjectsV2Input{
+ Bucket: aws.String(lister.Bucket.bucket),
+ MaxKeys: aws.Int64(int64(lister.PageSize)),
+ Prefix: aws.String(lister.Prefix),
+ ContinuationToken: &lister.ContinuationToken,
+ }
+ }
+
+ req := lister.Bucket.svc.ListObjectsV2Request(input)
+ resp, err := req.Send(context.Background())
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok {
+ lister.err = aerr
+ } else {
+ lister.err = err
+ }
+ return
+ }
+
+ if *resp.IsTruncated {
+ lister.ContinuationToken = *resp.NextContinuationToken
+ } else {
+ lister.ContinuationToken = ""
+ }
+ lister.buf = make([]s3.Object, 0, len(resp.Contents))
+ for _, key := range resp.Contents {
+ if !strings.HasPrefix(*key.Key, lister.Prefix) {
+ lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
+ continue
+ }
+ lister.buf = append(lister.buf, key)
+ }
+}
+
+func (lister *s3awsLister) pop() (k *s3.Object) {
+ if len(lister.buf) > 0 {
+ k = &lister.buf[0]
+ lister.buf = lister.buf[1:]
+ }
+ return
+}
+
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
+func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+ // Use a merge sort to find matching sets of X and recent/X.
+ dataL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: prefix,
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ recentL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: "recent/" + prefix,
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
+ if *data.Key >= "g" {
+ // Conveniently, "recent/*" and "trash/*" are
+ // lexically greater than all hex-encoded data
+ // hashes, so stopping here avoids iterating
+ // over all of them needlessly with dataL.
+ break
+ }
+ if !v.isKeepBlock(*data.Key) {
+ continue
+ }
+
+ // stamp is the list entry we should use to report the
+ // last-modified time for this data block: it will be
+ // the recent/X entry if one exists, otherwise the
+ // entry for the data block itself.
+ stamp := data
+
+ // Advance to the corresponding recent/X marker, if any
+ for recent != nil && recentL.Error() == nil {
+ if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
+ recent = recentL.Next()
+ continue
+ } else if cmp == 0 {
+ stamp = recent
+ recent = recentL.Next()
+ break
+ } else {
+ // recent/X marker is missing: we'll
+ // use the timestamp on the data
+ // object.
+ break
+ }
+ }
+ if err := recentL.Error(); err != nil {
+ return err
+ }
+ fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.UnixNano())
+ }
+ return dataL.Error()
+}
+
+// Mtime returns the stored timestamp for the given locator.
+func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+ _, err := v.Head(loc)
+ if err != nil {
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ resp, err := v.Head("recent/" + loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // The data object X exists, but recent/X is missing.
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ if err != nil {
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ resp, err = v.Head("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ } else if err != nil {
+ // HEAD recent/X failed for some other reason.
+ return s3AWSZeroTime, err
+ }
+ return *resp.LastModified, err
+}
+
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
+func (v *S3AWSVolume) Status() *VolumeStatus {
+ return &VolumeStatus{
+ DeviceNum: 1,
+ BytesFree: BlockSize * 1000,
+ BytesUsed: 1,
+ }
+}
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3AWSVolume) InternalStats() interface{} {
+ return &v.bucket.stats
+}
+
+// Touch sets the timestamp for the given locator to the current time.
+func (v *S3AWSVolume) Touch(loc string) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+ _, err := v.Head(loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) && v.fixRace(loc) {
+ // The data object got trashed in a race, but fixRace
+ // rescued it.
+ } else if err != nil {
+ return err
+ }
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ return v.translateError(err)
+}
+
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3AWSVolume) checkRaceWindow(loc string) error {
+ resp, err := v.Head("trash/" + loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // OK, trash/X doesn't exist so we're not in the race
+ // window
+ return nil
+ } else if err != nil {
+ // Error looking up trash/X. We don't know whether
+ // we're in the race window
+ return err
+ }
+ t := resp.LastModified
+ safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
+ if safeWindow <= 0 {
+ // We can't count on "touch trash/X" to prolong
+ // trash/X's lifetime. The new timestamp might not
+ // become visible until now+raceWindow, and EmptyTrash
+ // is allowed to delete trash/X before then.
+ return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ }
+ // trash/X exists, but it won't be eligible for deletion until
+ // after now+raceWindow, so it's safe to overwrite it.
+ return nil
+}
+
+func (b *s3AWSbucket) Del(path string) error {
+ input := &s3.DeleteObjectInput{
+ Bucket: aws.String(b.bucket),
+ Key: aws.String(path),
+ }
+ req := b.svc.DeleteObjectRequest(input)
+ _, err := req.Send(context.Background())
+ //err := b.Bucket().Del(path)
+ b.stats.TickOps("delete")
+ b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+// Trash a Keep block.
+func (v *S3AWSVolume) Trash(loc string) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+ if t, err := v.Mtime(loc); err != nil {
+ return err
+ } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
+ return nil
+ }
+ if v.cluster.Collections.BlobTrashLifetime == 0 {
+ if !v.UnsafeDelete {
+ return ErrS3TrashDisabled
+ }
+ return v.translateError(v.bucket.Del(loc))
+ }
+ err := v.checkRaceWindow(loc)
+ if err != nil {
+ return err
+ }
+ err = v.safeCopy("trash/"+loc, loc)
+ if err != nil {
+ return err
+ }
+ return v.translateError(v.bucket.Del(loc))
+}
+
+// Untrash moves block from trash back into store
+func (v *S3AWSVolume) Untrash(loc string) error {
+ err := v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ return err
+ }
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ return v.translateError(err)
+}
+
+type s3awsbucketStats struct {
+ statsTicker
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
+}
+
+func (s *s3awsbucketStats) TickErr(err error) {
+ if err == nil {
+ return
+ }
+ errType := fmt.Sprintf("%T", err)
+ if aerr, ok := err.(awserr.Error); ok {
+ if reqErr, ok := err.(awserr.RequestFailure); ok {
+ // A service error occurred
+ errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+ } else {
+ errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+ }
+ }
+ s.statsTicker.TickErr(err, errType)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+const (
+ S3AWSTestBucketName = "testbucket"
+)
+
+type s3AWSFakeClock struct {
+ now *time.Time
+}
+
+func (c *s3AWSFakeClock) Now() time.Time {
+ if c.now == nil {
+ return time.Now().UTC()
+ }
+ return c.now.UTC()
+}
+
+func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+ return c.Now().Sub(t)
+}
+
+var _ = check.Suite(&StubbedS3AWSSuite{})
+
+var srv httptest.Server
+
+type StubbedS3AWSSuite struct {
+ s3server *httptest.Server
+ metadata *httptest.Server
+ cluster *arvados.Cluster
+ handler *handler
+ volumes []*TestableS3AWSVolume
+}
+
+func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+ s.s3server = nil
+ s.metadata = nil
+ s.cluster = testCluster(c)
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
+ "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
+ }
+ s.handler = &handler{}
+}
+
+func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ // Use a negative raceWindow so s3test's 1-second
+ // timestamp precision doesn't confuse fixRace.
+ return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+ DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+ v.IndexPageSize = 3
+ for i := 0; i < 256; i++ {
+ v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ }
+ for _, spec := range []struct {
+ prefix string
+ expectMatch int
+ }{
+ {"", 256},
+ {"c", 16},
+ {"bc", 1},
+ {"abc", 0},
+ } {
+ buf := new(bytes.Buffer)
+ err := v.IndexTo(spec.prefix, buf)
+ c.Check(err, check.IsNil)
+
+ idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
+ c.Check(len(idx), check.Equals, spec.expectMatch+1)
+ c.Check(len(idx[len(idx)-1]), check.Equals, 0)
+ }
+}
+
+func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+ var header http.Header
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ header = r.Header
+ }))
+ defer stub.Close()
+
+ // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+ // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+ vol := S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ AccessKey: "xxx",
+ SecretKey: "xxx",
+ Endpoint: stub.URL,
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := vol.check("")
+ // Our test S3 server uses the older 'Path Style'
+ vol.bucket.svc.ForcePathStyle = true
+
+ c.Check(err, check.IsNil)
+ err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+ exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+ // Literal example from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // but with updated timestamps
+ io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ }))
+ defer s.metadata.Close()
+
+ v := &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: s.metadata.URL + "/latest/api/token",
+ Endpoint: "http://localhost:12345",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := v.check(s.metadata.URL + "/latest")
+ creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+ c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+ c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusNotFound)
+ }))
+ deadv := &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: s.metadata.URL + "/fake-metadata/test-role",
+ Endpoint: "http://localhost:12345",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err = deadv.check(s.metadata.URL + "/latest")
+ _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+ c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+ c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+}
+
+func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ stats := func() string {
+ buf, err := json.Marshal(v.InternalStats())
+ c.Check(err, check.IsNil)
+ return string(buf)
+ }
+
+ c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.NotNil)
+ c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+ err = v.Put(context.Background(), loc, []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+ c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
+type s3AWSBlockingHandler struct {
+ requested chan *http.Request
+ unblock chan struct{}
+}
+
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
+ // Accept PutBucket ("PUT /bucketname/"), called by
+ // newTestableVolume
+ return
+ }
+ if h.requested != nil {
+ h.requested <- r
+ }
+ if h.unblock != nil {
+ <-h.unblock
+ }
+ http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := make([]byte, 3)
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ _, err := v.Get(ctx, loc, buf)
+ return err
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("bar")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ return v.Compare(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("foo")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ return v.Put(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+ handler := &s3AWSBlockingHandler{}
+ s.s3server = httptest.NewServer(handler)
+ defer s.s3server.Close()
+
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ handler.requested = make(chan *http.Request)
+ handler.unblock = make(chan struct{})
+ defer close(handler.unblock)
+
+ doneFunc := make(chan struct{})
+ go func() {
+ err := testFunc(ctx, v)
+ c.Check(err, check.Equals, context.Canceled)
+ close(doneFunc)
+ }()
+
+ timeout := time.After(10 * time.Second)
+
+ // Wait for the stub server to receive a request, meaning
+ // Get() is waiting for an s3 operation.
+ select {
+ case <-timeout:
+ c.Fatal("timed out waiting for test func to call our handler")
+ case <-doneFunc:
+ c.Fatal("test func finished without even calling our handler!")
+ case <-handler.requested:
+ }
+
+ cancel()
+
+ select {
+ case <-timeout:
+ c.Fatal("timed out")
+ case <-doneFunc:
+ }
+}
+
+func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+ s.cluster.Collections.BlobTrashLifetime.Set("1h")
+ s.cluster.Collections.BlobSigningTTL.Set("1h")
+
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ var none time.Time
+
+ putS3Obj := func(t time.Time, key string, data []byte) {
+ if t == none {
+ return
+ }
+ v.serverClock.now = &t
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+ _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(key),
+ Body: bytes.NewReader(data),
+ })
+ if err != nil {
+ panic(err)
+ }
+ v.serverClock.now = nil
+ _, err = v.Head(key)
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ t0 := time.Now()
+ nextKey := 0
+ for _, scenario := range []struct {
+ label string
+ dataT time.Time
+ recentT time.Time
+ trashT time.Time
+ canGet bool
+ canTrash bool
+ canGetAfterTrash bool
+ canUntrash bool
+ haveTrashAfterEmpty bool
+ freshAfterEmpty bool
+ }{
+ {
+ "No related objects",
+ none, none, none,
+ false, false, false, false, false, false,
+ },
+ {
+ // Stored by older version, or there was a
+ // race between EmptyTrash and Put: Trash is a
+ // no-op even though the data object is very
+ // old
+ "No recent/X",
+ t0.Add(-48 * time.Hour), none, none,
+ true, true, true, false, false, false,
+ },
+ {
+ "Not trash, but old enough to be eligible for trash",
+ t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+ true, true, false, false, false, false,
+ },
+ {
+ "Not trash, and not old enough to be eligible for trash",
+ t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+ true, true, true, false, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist, due to recent race between Trash and Put",
+ t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+ true, true, true, true, true, false,
+ },
+ {
+ "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+ true, false, true, true, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, true, true,
+ },
+ {
+ "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
+ t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+ true, true, true, true, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
+ t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, false, false,
+ },
+ {
+ "Trash, not yet eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+ false, false, false, true, true, false,
+ },
+ {
+ "Trash, not yet eligible for deletion, prone to races",
+ none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ false, false, false, true, true, false,
+ },
+ {
+ "Trash, eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+ false, false, false, true, false, false,
+ },
+ {
+ "Erroneously trashed during a race, detected before BlobTrashLifetime",
+ none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
+ none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+ none, none, t0.Add(-time.Minute),
+ false, false, false, true, true, true,
+ },
+ } {
+ c.Log("Scenario: ", scenario.label)
+
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setupScenario := func() (string, []byte) {
+ nextKey++
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
+ c.Log("\t", loc)
+ putS3Obj(scenario.dataT, loc, blk)
+ putS3Obj(scenario.recentT, "recent/"+loc, nil)
+ putS3Obj(scenario.trashT, "trash/"+loc, blk)
+ v.serverClock.now = &t0
+ return loc, blk
+ }
+
+ // Check canGet
+ loc, blk := setupScenario()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Trash, then check canTrash and canGetAfterTrash
+ loc, _ = setupScenario()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Untrash, then check canUntrash
+ loc, _ = setupScenario()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err, check.IsNil)
+ }
+
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
+ loc, _ = setupScenario()
+ v.EmptyTrash()
+ _, err = v.Head("trash/" + loc)
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
+ loc, blk = setupScenario()
+ err = v.Put(context.Background(), loc, blk)
+ c.Check(err, check.IsNil)
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+}
+
+type TestableS3AWSVolume struct {
+ *S3AWSVolume
+ server *httptest.Server
+ c *check.C
+ serverClock *s3AWSFakeClock
+}
+
+type LogrusLog struct {
+ log *logrus.FieldLogger
+}
+
+func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+ switch level {
+ case gofakes3.LogErr:
+ (*l.log).Errorln(v...)
+ case gofakes3.LogWarn:
+ (*l.log).Warnln(v...)
+ case gofakes3.LogInfo:
+ (*l.log).Infoln(v...)
+ default:
+ panic("unknown level")
+ }
+}
+
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+
+ clock := &s3AWSFakeClock{}
+ // fake s3
+ backend := s3mem.New(s3mem.WithTimeSource(clock))
+
+ // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
+ /* logger := new(LogrusLog)
+ ctxLogger := ctxlog.FromContext(context.Background())
+ logger.log = &ctxLogger */
+ faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
+ srv := httptest.NewServer(faker.Server())
+
+ endpoint := srv.URL
+ if s.s3server != nil {
+ endpoint = s.s3server.URL
+ }
+
+ iamRole, accessKey, secretKey := "", "xxx", "xxx"
+ if s.metadata != nil {
+ iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+ }
+
+ v := &TestableS3AWSVolume{
+ S3AWSVolume: &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: iamRole,
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ Bucket: S3AWSTestBucketName,
+ Endpoint: endpoint,
+ Region: "test-region-1",
+ LocationConstraint: true,
+ UnsafeDelete: true,
+ IndexPageSize: 1000,
+ },
+ cluster: cluster,
+ volume: volume,
+ logger: ctxlog.TestLogger(c),
+ metrics: metrics,
+ },
+ c: c,
+ server: srv,
+ serverClock: clock,
+ }
+ c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+ // Our test S3 server uses the older 'Path Style'
+ v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+ // Create the testbucket
+ input := &s3.CreateBucketInput{
+ Bucket: aws.String(S3AWSTestBucketName),
+ }
+ req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+ _, err := req.Send(context.Background())
+ c.Assert(err, check.IsNil)
+ // We couldn't set RaceWindow until now because check()
+ // rejects negative values.
+ v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+ return v
+}
+
+// PutRaw skips the ContentMD5 test
+func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+
+ r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ u.PartSize = 5 * 1024 * 1024
+ u.Concurrency = 13
+ })
+
+ _, err := uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ Body: r,
+ })
+ if err != nil {
+ v.logger.Printf("PutRaw: %s: %+v", loc, err)
+ }
+
+ empty := bytes.NewReader([]byte{})
+ _, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + loc),
+ Body: empty,
+ })
+ if err != nil {
+ v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ }
+}
+
+// TouchWithDate turns back the clock while doing a Touch(). We assume
+// there are no other operations happening on the same s3test server
+// while we do this.
+func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+ v.serverClock.now = &lastPut
+
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+ empty := bytes.NewReader([]byte{})
+ _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + locator),
+ Body: empty,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ v.serverClock.now = nil
+}
+
+func (v *TestableS3AWSVolume) Teardown() {
+ v.server.Close()
+}
+
+func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "put"
+}