Merge branch '21388-arvados-3.0-distros-docs'
[arvados.git] / services / keepstore / s3_volume.go
index dc857c32646b2aced992243122b94750607cf4e8..dd4666039dd2c3821762f6d83ed3543917acd4cf 100644 (file)
@@ -13,6 +13,7 @@ import (
        "errors"
        "fmt"
        "io"
        "errors"
        "fmt"
        "io"
+       "net/url"
        "os"
        "regexp"
        "strings"
        "os"
        "regexp"
        "strings"
@@ -22,13 +23,13 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/aws/aws-sdk-go-v2/aws"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/aws/aws-sdk-go-v2/aws"
-       "github.com/aws/aws-sdk-go-v2/aws/awserr"
-       "github.com/aws/aws-sdk-go-v2/aws/defaults"
-       "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
-       "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
-       "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+       v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
+       "github.com/aws/aws-sdk-go-v2/config"
+       "github.com/aws/aws-sdk-go-v2/credentials"
+       "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
        "github.com/aws/aws-sdk-go-v2/service/s3"
        "github.com/aws/aws-sdk-go-v2/service/s3"
-       "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+       "github.com/aws/aws-sdk-go-v2/service/s3/types"
+       "github.com/aws/smithy-go"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -49,16 +50,19 @@ const (
 )
 
 var (
 )
 
 var (
-       errS3TrashDisabled   = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
-       s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-       s3AWSZeroTime        time.Time
+       errS3TrashDisabled        = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+       s3AWSKeepBlockRegexp      = regexp.MustCompile(`^[0-9a-f]{32}$`)
+       s3AWSZeroTime             time.Time
+       defaultEndpointResolverV2 = s3.NewDefaultEndpointResolverV2()
+
+       // Returned by an aws.EndpointResolverWithOptions to indicate
+       // that the default resolver should be used.
+       errEndpointNotOverridden = &aws.EndpointNotFoundError{Err: errors.New("endpoint not overridden")}
 )
 
 // s3Volume implements Volume using an S3 bucket.
 type s3Volume struct {
        arvados.S3VolumeDriverParameters
 )
 
 // s3Volume implements Volume using an S3 bucket.
 type s3Volume struct {
        arvados.S3VolumeDriverParameters
-       AuthToken      string    // populated automatically when IAMRole is used
-       AuthExpiration time.Time // populated automatically when IAMRole is used
 
        cluster    *arvados.Cluster
        volume     arvados.Volume
 
        cluster    *arvados.Cluster
        volume     arvados.Volume
@@ -68,6 +72,9 @@ type s3Volume struct {
        bucket     *s3Bucket
        region     string
        startOnce  sync.Once
        bucket     *s3Bucket
        region     string
        startOnce  sync.Once
+
+       overrideEndpoint *aws.Endpoint
+       usePathStyle     bool // used by test suite
 }
 
 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
 }
 
 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
@@ -114,12 +121,15 @@ func news3Volume(params newVolumeParams) (volume, error) {
 }
 
 func (v *s3Volume) translateError(err error) error {
 }
 
 func (v *s3Volume) translateError(err error) error {
-       if _, ok := err.(*aws.RequestCanceledError); ok {
+       if cerr := (interface{ CanceledError() bool })(nil); errors.As(err, &cerr) && cerr.CanceledError() {
+               // *aws.RequestCanceledError and *smithy.CanceledError
+               // implement this interface.
                return context.Canceled
                return context.Canceled
-       } else if aerr, ok := err.(awserr.Error); ok {
-               if aerr.Code() == "NotFound" {
-                       return os.ErrNotExist
-               } else if aerr.Code() == "NoSuchKey" {
+       }
+       var aerr smithy.APIError
+       if errors.As(err, &aerr) {
+               switch aerr.ErrorCode() {
+               case "NotFound", "NoSuchKey":
                        return os.ErrNotExist
                }
        }
                        return os.ErrNotExist
                }
        }
@@ -140,20 +150,17 @@ func (v *s3Volume) safeCopy(dst, src string) error {
                Key:         aws.String(dst),
        }
 
                Key:         aws.String(dst),
        }
 
-       req := v.bucket.svc.CopyObjectRequest(input)
-       resp, err := req.Send(context.Background())
+       resp, err := v.bucket.svc.CopyObject(context.Background(), input)
 
        err = v.translateError(err)
        if os.IsNotExist(err) {
                return err
        } else if err != nil {
                return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
 
        err = v.translateError(err)
        if os.IsNotExist(err) {
                return err
        } else if err != nil {
                return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
-       }
-
-       if resp.CopyObjectResult.LastModified == nil {
-               return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
-       } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
-               return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+       } else if resp.CopyObjectResult.LastModified == nil {
+               return fmt.Errorf("PutCopy(%q ← %q): succeeded but did not return a timestamp", dst, v.bucket.bucket+"/"+src)
+       } else if skew := time.Now().UTC().Sub(*resp.CopyObjectResult.LastModified); skew > maxClockSkew {
+               return fmt.Errorf("PutCopy succeeded but returned old timestamp %s (skew %v > max %v, now %s)", resp.CopyObjectResult.LastModified, skew, maxClockSkew, time.Now())
        }
        return nil
 }
        }
        return nil
 }
@@ -173,28 +180,18 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
                return errors.New("DriverParameters: V2Signature is not supported")
        }
 
                return errors.New("DriverParameters: V2Signature is not supported")
        }
 
-       defaultResolver := endpoints.NewDefaultResolver()
-
-       cfg := defaults.Config()
-
        if v.Endpoint == "" && v.Region == "" {
                return fmt.Errorf("AWS region or endpoint must be specified")
        if v.Endpoint == "" && v.Region == "" {
                return fmt.Errorf("AWS region or endpoint must be specified")
-       } else if v.Endpoint != "" || ec2metadataHostname != "" {
-               myCustomResolver := func(service, region string) (aws.Endpoint, error) {
-                       if v.Endpoint != "" && service == "s3" {
-                               return aws.Endpoint{
-                                       URL:           v.Endpoint,
-                                       SigningRegion: region,
-                               }, nil
-                       } else if service == "ec2metadata" && ec2metadataHostname != "" {
-                               return aws.Endpoint{
-                                       URL: ec2metadataHostname,
-                               }, nil
-                       } else {
-                               return defaultResolver.ResolveEndpoint(service, region)
-                       }
+       } else if v.Endpoint != "" {
+               _, err := url.Parse(v.Endpoint)
+               if err != nil {
+                       return fmt.Errorf("error parsing custom S3 endpoint %q: %w", v.Endpoint, err)
+               }
+               v.overrideEndpoint = &aws.Endpoint{
+                       URL:               v.Endpoint,
+                       HostnameImmutable: true,
+                       Source:            aws.EndpointSourceCustom,
                }
                }
-               cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
        }
        if v.Region == "" {
                // Endpoint is already specified (otherwise we would
        }
        if v.Region == "" {
                // Endpoint is already specified (otherwise we would
@@ -203,7 +200,6 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
                // SignatureVersions.
                v.Region = "us-east-1"
        }
                // SignatureVersions.
                v.Region = "us-east-1"
        }
-       cfg.Region = v.Region
 
        // Zero timeouts mean "wait forever", which is a bad
        // default. Default to long timeouts instead.
 
        // Zero timeouts mean "wait forever", which is a bad
        // default. Default to long timeouts instead.
@@ -214,17 +210,65 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
                v.ReadTimeout = s3DefaultReadTimeout
        }
 
                v.ReadTimeout = s3DefaultReadTimeout
        }
 
-       creds := aws.NewChainProvider(
-               []aws.CredentialsProvider{
-                       aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
-                       ec2rolecreds.New(ec2metadata.New(cfg)),
-               })
-
-       cfg.Credentials = creds
+       cfg, err := config.LoadDefaultConfig(context.TODO(),
+               config.WithRegion(v.Region),
+               config.WithCredentialsCacheOptions(func(o *aws.CredentialsCacheOptions) {
+                       // (from aws-sdk-go-v2 comments) "allow the
+                       // credentials to trigger refreshing prior to
+                       // the credentials actually expiring. This is
+                       // beneficial so race conditions with expiring
+                       // credentials do not cause request to fail
+                       // unexpectedly due to ExpiredTokenException
+                       // exceptions."
+                       //
+                       // (from
+                       // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+                       // "We make new credentials available at least
+                       // five minutes before the expiration of the
+                       // old credentials."
+                       o.ExpiryWindow = 5 * time.Minute
+               }),
+               func(o *config.LoadOptions) error {
+                       if v.AccessKeyID == "" && v.SecretAccessKey == "" {
+                               // Use default sdk behavior (IAM / IMDS)
+                               return nil
+                       }
+                       v.logger.Debug("using static credentials")
+                       o.Credentials = credentials.StaticCredentialsProvider{
+                               Value: aws.Credentials{
+                                       AccessKeyID:     v.AccessKeyID,
+                                       SecretAccessKey: v.SecretAccessKey,
+                                       Source:          "Arvados configuration",
+                               },
+                       }
+                       return nil
+               },
+               func(o *config.LoadOptions) error {
+                       if ec2metadataHostname != "" {
+                               o.EC2IMDSEndpoint = ec2metadataHostname
+                       }
+                       if v.overrideEndpoint != nil {
+                               o.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
+                                       if service == "S3" {
+                                               return *v.overrideEndpoint, nil
+                                       }
+                                       return aws.Endpoint{}, errEndpointNotOverridden // use default resolver
+                               })
+                       }
+                       return nil
+               },
+       )
+       if err != nil {
+               return fmt.Errorf("error loading aws client config: %w", err)
+       }
 
        v.bucket = &s3Bucket{
                bucket: v.Bucket,
 
        v.bucket = &s3Bucket{
                bucket: v.Bucket,
-               svc:    s3.New(cfg),
+               svc: s3.NewFromConfig(cfg, func(o *s3.Options) {
+                       if v.usePathStyle {
+                               o.UsePathStyle = true
+                       }
+               }),
        }
 
        // Set up prometheus metrics
        }
 
        // Set up prometheus metrics
@@ -247,7 +291,7 @@ func (v *s3Volume) EmptyTrash() {
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
 
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
 
-       emptyOneKey := func(trash *s3.Object) {
+       emptyOneKey := func(trash *types.Object) {
                key := strings.TrimPrefix(*trash.Key, "trash/")
                loc, isblk := v.isKeepBlock(key)
                if !isblk {
                key := strings.TrimPrefix(*trash.Key, "trash/")
                loc, isblk := v.isKeepBlock(key)
                if !isblk {
@@ -321,7 +365,7 @@ func (v *s3Volume) EmptyTrash() {
        }
 
        var wg sync.WaitGroup
        }
 
        var wg sync.WaitGroup
-       todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+       todo := make(chan *types.Object, v.cluster.Collections.BlobDeleteConcurrency)
        for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
                wg.Add(1)
                go func() {
        for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
                wg.Add(1)
                go func() {
@@ -395,8 +439,7 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
                Key:    aws.String(key),
        }
 
                Key:    aws.String(key),
        }
 
-       req := v.bucket.svc.HeadObjectRequest(input)
-       res, err := req.Send(context.TODO())
+       res, err := v.bucket.svc.HeadObject(context.Background(), input)
 
        v.bucket.stats.TickOps("head")
        v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
 
        v.bucket.stats.TickOps("head")
        v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
@@ -405,8 +448,7 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
        if err != nil {
                return nil, v.translateError(err)
        }
        if err != nil {
                return nil, v.translateError(err)
        }
-       result = res.HeadObjectOutput
-       return
+       return res, nil
 }
 
 // BlockRead reads a Keep block that has been stored as a block blob
 }
 
 // BlockRead reads a Keep block that has been stored as a block blob
@@ -443,11 +485,11 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) er
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
-       downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+       downloader := manager.NewDownloader(v.bucket.svc, func(u *manager.Downloader) {
                u.PartSize = s3downloaderPartSize
                u.Concurrency = s3downloaderReadConcurrency
        })
                u.PartSize = s3downloaderPartSize
                u.Concurrency = s3downloaderReadConcurrency
        })
-       count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{
+       count, err := downloader.Download(ctx, dst, &s3.GetObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
        })
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
        })
@@ -465,7 +507,7 @@ func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) err
                r = bytes.NewReader(nil)
        }
 
                r = bytes.NewReader(nil)
        }
 
-       uploadInput := s3manager.UploadInput{
+       uploadInput := s3.PutObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
                Body:   r,
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
                Body:   r,
@@ -485,20 +527,15 @@ func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) err
        // throughput, better than higher concurrency (10 or 13) by ~5%.
        // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
        // is detrimental to throughput (minus ~15%).
        // throughput, better than higher concurrency (10 or 13) by ~5%.
        // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
        // is detrimental to throughput (minus ~15%).
-       uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+       uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
                u.PartSize = s3uploaderPartSize
                u.Concurrency = s3uploaderWriteConcurrency
        })
 
                u.PartSize = s3uploaderPartSize
                u.Concurrency = s3uploaderWriteConcurrency
        })
 
-       // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
-       // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
-       // block, so there is no extra memory use to be concerned about. See
-       // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
-       // calculating the Sha-256 because we don't need it; we already use md5sum
-       // hashes that match the name of the block.
-       _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
-               r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
-       }))
+       _, err := uploader.Upload(ctx, &uploadInput,
+               // Avoid precomputing SHA256 before sending.
+               manager.WithUploaderRequestOptions(s3.WithAPIOptions(v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)),
+       )
 
        v.bucket.stats.TickOps("put")
        v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
 
        v.bucket.stats.TickOps("put")
        v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
@@ -528,13 +565,13 @@ type s3awsLister struct {
        PageSize          int
        Stats             *s3awsbucketStats
        ContinuationToken string
        PageSize          int
        Stats             *s3awsbucketStats
        ContinuationToken string
-       buf               []s3.Object
+       buf               []types.Object
        err               error
 }
 
 // First fetches the first page and returns the first item. It returns
 // nil if the response is the empty set or an error occurs.
        err               error
 }
 
 // First fetches the first page and returns the first item. It returns
 // nil if the response is the empty set or an error occurs.
-func (lister *s3awsLister) First() *s3.Object {
+func (lister *s3awsLister) First() *types.Object {
        lister.getPage()
        return lister.pop()
 }
        lister.getPage()
        return lister.pop()
 }
@@ -542,7 +579,7 @@ func (lister *s3awsLister) First() *s3.Object {
 // Next returns the next item, fetching the next page if necessary. It
 // returns nil if the last available item has already been fetched, or
 // an error occurs.
 // Next returns the next item, fetching the next page if necessary. It
 // returns nil if the last available item has already been fetched, or
 // an error occurs.
-func (lister *s3awsLister) Next() *s3.Object {
+func (lister *s3awsLister) Next() *types.Object {
        if len(lister.buf) == 0 && lister.ContinuationToken != "" {
                lister.getPage()
        }
        if len(lister.buf) == 0 && lister.ContinuationToken != "" {
                lister.getPage()
        }
@@ -562,22 +599,22 @@ func (lister *s3awsLister) getPage() {
        if lister.ContinuationToken == "" {
                input = &s3.ListObjectsV2Input{
                        Bucket:  aws.String(lister.Bucket.bucket),
        if lister.ContinuationToken == "" {
                input = &s3.ListObjectsV2Input{
                        Bucket:  aws.String(lister.Bucket.bucket),
-                       MaxKeys: aws.Int64(int64(lister.PageSize)),
+                       MaxKeys: aws.Int32(int32(lister.PageSize)),
                        Prefix:  aws.String(lister.Prefix),
                }
        } else {
                input = &s3.ListObjectsV2Input{
                        Bucket:            aws.String(lister.Bucket.bucket),
                        Prefix:  aws.String(lister.Prefix),
                }
        } else {
                input = &s3.ListObjectsV2Input{
                        Bucket:            aws.String(lister.Bucket.bucket),
-                       MaxKeys:           aws.Int64(int64(lister.PageSize)),
+                       MaxKeys:           aws.Int32(int32(lister.PageSize)),
                        Prefix:            aws.String(lister.Prefix),
                        ContinuationToken: &lister.ContinuationToken,
                }
        }
 
                        Prefix:            aws.String(lister.Prefix),
                        ContinuationToken: &lister.ContinuationToken,
                }
        }
 
-       req := lister.Bucket.svc.ListObjectsV2Request(input)
-       resp, err := req.Send(context.Background())
+       resp, err := lister.Bucket.svc.ListObjectsV2(context.Background(), input)
        if err != nil {
        if err != nil {
-               if aerr, ok := err.(awserr.Error); ok {
+               var aerr smithy.APIError
+               if errors.As(err, &aerr) {
                        lister.err = aerr
                } else {
                        lister.err = err
                        lister.err = aerr
                } else {
                        lister.err = err
@@ -590,7 +627,7 @@ func (lister *s3awsLister) getPage() {
        } else {
                lister.ContinuationToken = ""
        }
        } else {
                lister.ContinuationToken = ""
        }
-       lister.buf = make([]s3.Object, 0, len(resp.Contents))
+       lister.buf = make([]types.Object, 0, len(resp.Contents))
        for _, key := range resp.Contents {
                if !strings.HasPrefix(*key.Key, lister.Prefix) {
                        lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
        for _, key := range resp.Contents {
                if !strings.HasPrefix(*key.Key, lister.Prefix) {
                        lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
@@ -600,7 +637,7 @@ func (lister *s3awsLister) getPage() {
        }
 }
 
        }
 }
 
-func (lister *s3awsLister) pop() (k *s3.Object) {
+func (lister *s3awsLister) pop() (k *types.Object) {
        if len(lister.buf) > 0 {
                k = &lister.buf[0]
                lister.buf = lister.buf[1:]
        if len(lister.buf) > 0 {
                k = &lister.buf[0]
                lister.buf = lister.buf[1:]
@@ -758,8 +795,7 @@ func (b *s3Bucket) Del(path string) error {
                Bucket: aws.String(b.bucket),
                Key:    aws.String(path),
        }
                Bucket: aws.String(b.bucket),
                Key:    aws.String(path),
        }
-       req := b.svc.DeleteObjectRequest(input)
-       _, err := req.Send(context.Background())
+       _, err := b.svc.DeleteObject(context.Background(), input)
        b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
@@ -817,12 +853,11 @@ func (s *s3awsbucketStats) TickErr(err error) {
                return
        }
        errType := fmt.Sprintf("%T", err)
                return
        }
        errType := fmt.Sprintf("%T", err)
-       if aerr, ok := err.(awserr.Error); ok {
-               if reqErr, ok := err.(awserr.RequestFailure); ok {
-                       // A service error occurred
-                       errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+       if aerr := smithy.APIError(nil); errors.As(err, &aerr) {
+               if rerr := interface{ HTTPStatusCode() int }(nil); errors.As(err, &rerr) {
+                       errType = errType + fmt.Sprintf(" %d %s", rerr.HTTPStatusCode(), aerr.ErrorCode())
                } else {
                } else {
-                       errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+                       errType = errType + fmt.Sprintf(" 000 %s", aerr.ErrorCode())
                }
        }
        s.statsTicker.TickErr(err, errType)
                }
        }
        s.statsTicker.TickErr(err, errType)