AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
- SSH: &compute.SSHConfiguration{
- PublicKeys: &[]compute.SSHPublicKey{
- {
- Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
- KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
- },
- },
- },
},
CustomData: &customData,
},
},
}
+ if publicKey != nil {
+ vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH = &compute.SSHConfiguration{
+ PublicKeys: &[]compute.SSHPublicKey{
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
+ KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
+ },
+ },
+ }
+ }
+
if instanceType.Preemptible {
// Setting maxPrice to -1 is the equivalent of paying spot price, up to the
// normal price. This means the node will not be pre-empted for price
const testNamePrefix = "compute-test123-"
-type VirtualMachinesClientStub struct{}
+type VirtualMachinesClientStub struct {
+ vmParameters compute.VirtualMachine
+}
-func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
+func (stub *VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
parameters.ID = &VMName
parameters.Name = &VMName
+ stub.vmParameters = parameters
return parameters, nil
}
var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
-func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) {
+func GetInstanceSet() (*azureInstanceSet, cloud.ImageID, arvados.Cluster, error) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
}
ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
- return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
+ return ap.(*azureInstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
ap := azureInstanceSet{
azconfig: azureInstanceSetConfig{
tags := inst.Tags()
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ c.Check(ap.vmClient.(*VirtualMachinesClientStub).vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH, check.NotNil)
+ }
instPreemptable, err := ap.Create(cluster.InstanceTypes["tinyp"],
img, map[string]string{
"TestTagName": "test tag value",
- }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", nil)
c.Assert(err, check.IsNil)
tags = instPreemptable.Tags()
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("instPreemptable.String()=%v Address()=%v Tags()=%v", instPreemptable.String(), instPreemptable.Address(), tags)
-
+ if *live == "" {
+ // Should not have set SSH option, because publickey
+ // arg was nil
+ c.Check(ap.vmClient.(*VirtualMachinesClientStub).vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH, check.IsNil)
+ }
}
func (*AzureInstanceSetSuite) TestListInstances(c *check.C) {
c.Fatal("Error making provider", err)
}
- ap.(*azureInstanceSet).manageNics()
+ ap.manageNics()
ap.Stop()
}
c.Fatal("Error making provider", err)
}
- ap.(*azureInstanceSet).manageBlobs()
+ ap.manageBlobs()
ap.Stop()
}
c.Fatal("Error making provider", err)
}
- _, err = ap.(*azureInstanceSet).netClient.delete(context.Background(), "fakefakefake", "fakefakefake")
+ _, err = ap.netClient.delete(context.Background(), "fakefakefake", "fakefakefake")
de, ok := err.(autorest.DetailedError)
if ok {
initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
- keyname, err := instanceSet.getKeyName(publicKey)
- if err != nil {
- return nil, err
- }
-
ec2tags := []*ec2.Tag{}
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
InstanceType: &instanceType.ProviderType,
MaxCount: aws.Int64(1),
MinCount: aws.Int64(1),
- KeyName: &keyname,
NetworkInterfaces: []*ec2.InstanceNetworkInterfaceSpecification{
{
UserData: aws.String(base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))),
}
+ if publicKey != nil {
+ keyname, err := instanceSet.getKeyName(publicKey)
+ if err != nil {
+ return nil, err
+ }
+ rii.KeyName = &keyname
+ }
+
if instanceType.AddedScratch > 0 {
rii.BlockDeviceMappings = []*ec2.BlockDeviceMapping{{
DeviceName: aws.String("/dev/xvdt"),
}
type ec2stub struct {
- c *check.C
- reftime time.Time
+ c *check.C
+ reftime time.Time
+ importKeyPairCalls []*ec2.ImportKeyPairInput
+ describeKeyPairsCalls []*ec2.DescribeKeyPairsInput
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
+ e.importKeyPairCalls = append(e.importKeyPairCalls, input)
return nil, nil
}
func (e *ec2stub) DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error) {
+ e.describeKeyPairsCalls = append(e.describeKeyPairsCalls, input)
return &ec2.DescribeKeyPairsOutput{}, nil
}
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 1)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 1)
+ }
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
ap, img, cluster := GetInstanceSet(c)
- pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
-
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
"TestTagName": "test tag value",
- }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", nil)
c.Assert(err, check.IsNil)
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ // Should not have called key pair APIs, because
+ // publickey arg was nil
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 0)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 0)
+ }
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
# Maximum bytes that may be logged by a single job. Log bytes that are
# silenced by throttling are not counted against this total.
+ # If you set this to zero, each container will only create a single
+ # log on the API server, noting for users that logging is throttled.
LimitLogBytesPerJob: 67108864
LogPartialLineThrottlePeriod: 5s
# version of crunch-run installed; see CrunchRunCommand above.
DeployRunnerBinary: "/proc/self/exe"
+ # Install the Dispatcher's SSH public key (derived from
+ # DispatchPrivateKey) when creating new cloud
+ # instances. Change this to false if you are using a different
+ # mechanism to pre-install the public key on new instances.
+ DeployPublicKey: true
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
// Content-Length depends on encoding.
"Content-Length": true,
+
+ // Defend against Rails vulnerability CVE-2023-22795 -
+ // we don't use this functionality anyway, so it costs us nothing.
+ // <https://discuss.rubyonrails.org/t/cve-2023-22795-possible-redos-based-dos-vulnerability-in-action-dispatch/82118>
+ "If-None-Match": true,
}
type ResponseFilter func(*http.Response, error) (*http.Response, error)
// manifest, err := (&copier{...}).Copy()
type copier struct {
client *arvados.Client
- arvClient IArvadosClient
keepClient IKeepClient
hostOutputDir string
ctrOutputDir string
}
func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
- cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
+ cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return 0, err
// copy, relative to its mount point -- ".", "./foo.txt", ...
srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
+ // outputRelPath is the path relative in the output directory
+ // that corresponds to the path in the output collection where
+ // the file will go, for logging
+ var outputRelPath = ""
+ if strings.HasPrefix(src, cp.ctrOutputDir) {
+ outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
+ }
+ if outputRelPath == "" {
+ // blank means copy a whole directory, so replace it
+ // with a wildcard to make it a little clearer what's
+ // going on since outputRelPath is only used for logging
+ outputRelPath = "*"
+ }
+
switch {
case srcMount.ExcludeFromOutput:
case srcMount.Kind == "tmp":
case srcMount.Kind != "collection":
return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
case !srcMount.Writable:
+ cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
mft, err := cp.getManifest(srcMount.PortableDataHash)
if err != nil {
return err
}
cp.manifest += mft.Extract(srcRelPath, dest).Text
default:
+ cp.logger.Printf("copying %q", outputRelPath)
hostRoot, err := cp.hostRoot(srcRoot)
if err != nil {
return err
return mft, nil
}
var coll arvados.Collection
- err := cp.arvClient.Get("collections", pdh, nil, &coll)
+ err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
if err != nil {
return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
}
"syscall"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
func (s *copierSuite) SetUpTest(c *check.C) {
tmpdir := c.MkDir()
- api, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.IsNil)
s.log = bytes.Buffer{}
s.cp = copier{
client: arvados.NewClientFromEnv(),
- arvClient: api,
hostOutputDir: tmpdir,
ctrOutputDir: "/ctr/outdir",
mounts: map[string]arvados.Mount{
if err != nil {
return nil, fmt.Errorf("creating temp dir: %v", err)
}
- err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
+ err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token)
if err != nil {
return nil, err
}
txt, err := (&copier{
client: runner.containerClient,
- arvClient: runner.ContainerArvClient,
keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
log.Printf("%s: %v", containerUUID, err)
return 1
}
- api.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ api.Retries = 10
kc, err := keepclient.MakeKeepClient(api)
if err != nil {
fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
return conf
}
- arv.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ arv.Retries = 10
var ctr arvados.Container
err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
if err != nil {
"math/rand"
"net/http"
"net/http/httptest"
+ "net/http/httputil"
+ "net/url"
"os"
"os/exec"
"path"
"git.arvados.org/arvados.git/sdk/go/manifest"
. "gopkg.in/check.v1"
+ git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
+ git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
)
// Gocheck boilerplate
return nil, nil
}
+type apiStubServer struct {
+ server *httptest.Server
+ proxy *httputil.ReverseProxy
+ intercept func(http.ResponseWriter, *http.Request) bool
+
+ container arvados.Container
+ logs map[string]string
+}
+
+func apiStub() (*arvados.Client, *apiStubServer) {
+ client := arvados.NewClientFromEnv()
+ apistub := &apiStubServer{}
+ apistub.server = httptest.NewTLSServer(apistub)
+ apistub.proxy = httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "https", Host: client.APIHost})
+ if client.Insecure {
+ apistub.proxy.Transport = arvados.InsecureHTTPClient.Transport
+ }
+ client.APIHost = apistub.server.Listener.Addr().String()
+ return client, apistub
+}
+
+func (apistub *apiStubServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if apistub.intercept != nil && apistub.intercept(w, r) {
+ return
+ }
+ if r.Method == "POST" && r.URL.Path == "/arvados/v1/logs" {
+ var body struct {
+ Log struct {
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+ }
+ }
+ json.NewDecoder(r.Body).Decode(&body)
+ apistub.logs[body.Log.EventType] += body.Log.Properties.Text
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+hwPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: hwManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+otherPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: otherManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+normalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: normalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+denormalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: denormalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/containers/"+apistub.container.UUID {
+ json.NewEncoder(w).Encode(apistub.container)
+ return
+ }
+ apistub.proxy.ServeHTTP(w, r)
+}
+
func (s *TestSuite) TestLoadImage(c *C) {
s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
s.runner.Container.Mounts = map[string]arvados.Mount{
}
return d, err
}
+ client, _ := apiStub()
s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
- return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, nil, nil
+ return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, client, nil
}
if extraMounts != nil && len(extraMounts) > 0 {
cr := s.runner
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
+ cr.containerClient, _ = apiStub()
cr.ContainerArvClient = &ArvTestClient{}
cr.ContainerKeepClient = &KeepTestClient{}
cr.Container.OutputStorageClasses = []string{"default"}
{
i = 0
cr.ArvMountPoint = ""
- (*GitMountSuite)(nil).useTestGitServer(c)
+ git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
cr.token = arvadostest.ActiveToken
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
// Number of consecutive "inspect container" failures before
// concluding Docker is unresponsive, giving up, and cancelling the
// container.
-const dockerWatchdogThreshold = 3
+const dockerWatchdogThreshold = 5
type dockerExecutor struct {
containerUUID string
// currently the minimum version we want to support.
client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
if watchdogInterval < 1 {
- watchdogInterval = time.Minute
+ watchdogInterval = time.Minute * 2
}
return &dockerExecutor{
containerUUID: containerUUID,
// ExtractTree extracts the specified tree into dir, which is an
// existing empty local directory.
-func (gm gitMount) extractTree(ac IArvadosClient, dir string, token string) error {
+func (gm gitMount) extractTree(ac *arvados.Client, dir string, token string) error {
err := gm.validate()
if err != nil {
return err
}
- baseURL, err := ac.Discovery("gitUrl")
+ dd, err := ac.DiscoveryDocument()
if err != nil {
- return fmt.Errorf("discover gitUrl from API: %s", err)
- } else if _, ok := baseURL.(string); !ok {
- return fmt.Errorf("discover gitUrl from API: expected string, found %T", baseURL)
+ return fmt.Errorf("error getting discovery document: %w", err)
}
-
- u, err := url.Parse(baseURL.(string))
+ u, err := url.Parse(dd.GitURL)
if err != nil {
- return fmt.Errorf("parse gitUrl %q: %s", baseURL, err)
+ return fmt.Errorf("parse gitUrl %q: %s", dd.GitURL, err)
}
u, err = u.Parse("/" + gm.UUID + ".git")
if err != nil {
- return fmt.Errorf("build git url from %q, %q: %s", baseURL, gm.UUID, err)
+ return fmt.Errorf("build git url from %q, %q: %s", dd.GitURL, gm.UUID, err)
}
store := memory.NewStorage()
repo, err := git.Init(store, osfs.New(dir))
import (
"io/ioutil"
- "net/url"
"os"
"path/filepath"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
var _ = check.Suite(&GitMountSuite{})
func (s *GitMountSuite) SetUpTest(c *check.C) {
- s.useTestGitServer(c)
-
var err error
s.tmpdir, err = ioutil.TempDir("", "")
c.Assert(err, check.IsNil)
+ git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
}
func (s *GitMountSuite) TearDownTest(c *check.C) {
}
// Commit fd3531f is crunch-run-tree-test
-func (s *GitMountSuite) TestextractTree(c *check.C) {
+func (s *GitMountSuite) TestExtractTree(c *check.C) {
gm := gitMount{
Path: "/",
UUID: arvadostest.Repository2UUID,
Commit: "fd3531f42995344f36c30b79f55f27b502f3d344",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ ac := arvados.NewClientFromEnv()
+ err := gm.extractTree(ac, s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.IsNil)
fnm := filepath.Join(s.tmpdir, "dir1/dir2/file with mode 0644")
UUID: arvadostest.Repository2UUID,
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.IsNil)
fnm := filepath.Join(s.tmpdir, "file only on testbranch")
UUID: "zzzzz-s0uqq-nonexistentrepo",
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, ".*repository not found.*")
UUID: arvadostest.Repository2UUID,
Commit: "bb66b6bb6b6bbb6b6b6b66b6b6b6b6b6b6b6b66b",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, ".*object not found.*")
UUID: arvadostest.Repository2UUID,
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
- c.Check(err, check.ErrorMatches, ".*gitUrl.*")
+ err := gm.extractTree(&arvados.Client{}, s.tmpdir, arvadostest.ActiveToken)
+ c.Check(err, check.ErrorMatches, ".*error getting discovery doc.*")
}
func (s *GitMountSuite) TestInvalid(c *check.C) {
matcher: ".*writable.*",
},
} {
- err := trial.gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := trial.gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
s.checkTmpdirContents(c, []string{})
c.Check(err, check.IsNil)
c.Check(names, check.DeepEquals, expect)
}
-
-func (*GitMountSuite) useTestGitServer(c *check.C) {
- git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
-
- loader := config.NewLoader(nil, ctxlog.TestLogger(c))
- cfg, err := loader.Load()
- c.Assert(err, check.IsNil)
- cluster, err := cfg.GetCluster("")
- c.Assert(err, check.IsNil)
-
- discoveryMap["gitUrl"] = (*url.URL)(&cluster.Services.GitHTTP.ExternalURL).String()
-}
s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
}
+func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) {
+ s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)")
+}
+
func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
discoveryMap[throttleParam] = float64(throttleValue)
defer func() {
} else {
disp.sshKey = key
}
+ installPublicKey := disp.sshKey.PublicKey()
+ if !disp.Cluster.Containers.CloudVMs.DeployPublicKey {
+ installPublicKey = nil
+ }
instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
if err != nil {
}
dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
disp.instanceSet = instanceSet
- disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
"io/fs"
"io/ioutil"
"log"
+ "math"
"math/big"
+ mathrand "math/rand"
"net"
"net/http"
"net/url"
"os"
"regexp"
+ "strconv"
"strings"
"sync/atomic"
"time"
rclient := retryablehttp.NewClient()
rclient.HTTPClient = c.httpClient()
+ rclient.Backoff = exponentialBackoff
if c.Timeout > 0 {
rclient.RetryWaitMax = c.Timeout / 10
rclient.RetryMax = 32
}
}
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+ if attemptNum > 0 && min < minExponentialBackoffBase {
+ min = minExponentialBackoffBase
+ }
+ var t time.Duration
+ if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+ if s := resp.Header.Get("Retry-After"); s != "" {
+ if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+ t = time.Second * time.Duration(sleep)
+ } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+ t = stamp.Sub(time.Now())
+ }
+ }
+ }
+ if t == 0 {
+ jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+ t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+ }
+ if t < min {
+ return min
+ } else if t > max {
+ return max
+ } else {
+ return t
+ }
+}
+
// DoAndDecode performs req and unmarshals the response (which must be
// JSON) into dst. Use this instead of RequestAndDecode if you need
// more control of the http.Request object.
"context"
"fmt"
"io/ioutil"
+ "math"
"math/rand"
"net/http"
"net/http/httptest"
err := s.client.RequestAndDecodeContext(ctx, &struct{}{}, http.MethodGet, "test", nil, nil)
c.Check(err, check.Equals, context.Canceled)
}
+
+func (s *clientRetrySuite) TestExponentialBackoff(c *check.C) {
+ var min, max time.Duration
+ min, max = time.Second, 64*time.Second
+
+ t := exponentialBackoff(min, max, 0, nil)
+ c.Check(t, check.Equals, min)
+
+ for e := float64(1); e < 5; e += 1 {
+ ok := false
+ for i := 0; i < 20; i++ {
+ t = exponentialBackoff(min, max, int(e), nil)
+ // Every returned value must be between min and min(2^e, max)
+ c.Check(t >= min, check.Equals, true)
+ c.Check(t <= min*time.Duration(math.Pow(2, e)), check.Equals, true)
+ c.Check(t <= max, check.Equals, true)
+ // Check that jitter is actually happening by
+ // checking that at least one in 20 trials is
+ // between min*2^(e-.75) and min*2^(e-.25)
+ jittermin := time.Duration(float64(min) * math.Pow(2, e-0.75))
+ jittermax := time.Duration(float64(min) * math.Pow(2, e-0.25))
+ c.Logf("min %v max %v e %v jittermin %v jittermax %v t %v", min, max, e, jittermin, jittermax, t)
+ if t > jittermin && t < jittermax {
+ ok = true
+ break
+ }
+ }
+ c.Check(ok, check.Equals, true)
+ }
+
+ for i := 0; i < 20; i++ {
+ t := exponentialBackoff(min, max, 100, nil)
+ c.Check(t < max, check.Equals, true)
+ }
+
+ for _, trial := range []struct {
+ retryAfter string
+ expect time.Duration
+ }{
+ {"1", time.Second * 4}, // minimum enforced
+ {"5", time.Second * 5}, // header used
+ {"55", time.Second * 10}, // maximum enforced
+ {"eleventy-nine", time.Second * 4}, // invalid header, exponential backoff used
+ {time.Now().UTC().Add(time.Second).Format(time.RFC1123), time.Second * 4}, // minimum enforced
+ {time.Now().UTC().Add(time.Minute).Format(time.RFC1123), time.Second * 10}, // maximum enforced
+ {time.Now().UTC().Add(-time.Minute).Format(time.RFC1123), time.Second * 4}, // minimum enforced
+ } {
+ c.Logf("trial %+v", trial)
+ t := exponentialBackoff(time.Second*4, time.Second*10, 0, &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{"Retry-After": {trial.retryAfter}}})
+ c.Check(t, check.Equals, trial.expect)
+ }
+ t = exponentialBackoff(time.Second*4, time.Second*10, 0, &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ })
+ c.Check(t, check.Equals, time.Second*4)
+
+ t = exponentialBackoff(0, max, 0, nil)
+ c.Check(t, check.Equals, time.Duration(0))
+ t = exponentialBackoff(0, max, 1, nil)
+ c.Check(t, check.Not(check.Equals), time.Duration(0))
+}
BootProbeCommand string
InstanceInitCommand string
DeployRunnerBinary string
+ DeployPublicKey bool
ImageID string
MaxCloudOpsPerSecond int
MaxProbesPerSecond int
vals.Set(k, string(m))
}
}
-
- retryable := false
- switch method {
- case "GET", "HEAD", "PUT", "OPTIONS", "DELETE":
- retryable = true
- }
-
- // Non-retryable methods such as POST are not safe to retry automatically,
- // so we minimize such failures by always using a new or recently active socket
- if !retryable {
- if time.Since(c.lastClosedIdlesAt) > MaxIdleConnectionDuration {
- c.lastClosedIdlesAt = time.Now()
- c.Client.Transport.(*http.Transport).CloseIdleConnections()
- }
- }
-
- // Make the request
var req *http.Request
- var resp *http.Response
-
- for attempt := 0; attempt <= c.Retries; attempt++ {
- if method == "GET" || method == "HEAD" {
- u.RawQuery = vals.Encode()
- if req, err = http.NewRequest(method, u.String(), nil); err != nil {
- return nil, err
- }
- } else {
- if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
- return nil, err
- }
- req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- }
-
- // Add api token header
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
- if c.RequestID != "" {
- req.Header.Add("X-Request-Id", c.RequestID)
- }
-
- resp, err = c.Client.Do(req)
- if err != nil {
- if retryable {
- time.Sleep(RetryDelay)
- continue
- } else {
- return nil, err
- }
- }
-
- if resp.StatusCode == http.StatusOK {
- return resp.Body, nil
+ if method == "GET" || method == "HEAD" {
+ u.RawQuery = vals.Encode()
+ if req, err = http.NewRequest(method, u.String(), nil); err != nil {
+ return nil, err
}
-
- defer resp.Body.Close()
-
- switch resp.StatusCode {
- case 408, 409, 422, 423, 500, 502, 503, 504:
- time.Sleep(RetryDelay)
- continue
- default:
- return nil, newAPIServerError(c.ApiServer, resp)
+ } else {
+ if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
+ return nil, err
}
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
}
-
- if resp != nil {
+ if c.RequestID != "" {
+ req.Header.Add("X-Request-Id", c.RequestID)
+ }
+ client := arvados.Client{
+ Client: c.Client,
+ APIHost: c.ApiServer,
+ AuthToken: c.ApiToken,
+ Insecure: c.ApiInsecure,
+ Timeout: 30 * RetryDelay * time.Duration(c.Retries),
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
+ defer resp.Body.Close()
return nil, newAPIServerError(c.ApiServer, resp)
}
- return nil, err
+ return resp.Body, nil
}
func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
// Call an API endpoint and parse the JSON response into an object.
//
-// method - HTTP method: GET, HEAD, PUT, POST, PATCH or DELETE.
-// resourceType - the type of arvados resource to act on (e.g., "collections", "pipeline_instances").
-// uuid - the uuid of the specific item to access. May be empty.
-// action - API method name (e.g., "lock"). This is often empty if implied by method and uuid.
-// parameters - method parameters.
-// output - a map or annotated struct which is a legal target for encoding/json/Decoder.
+// method - HTTP method: GET, HEAD, PUT, POST, PATCH or DELETE.
+// resourceType - the type of arvados resource to act on (e.g., "collections", "pipeline_instances").
+// uuid - the uuid of the specific item to access. May be empty.
+// action - API method name (e.g., "lock"). This is often empty if implied by method and uuid.
+// parameters - method parameters.
+// output - a map or annotated struct which is a legal target for encoding/json/Decoder.
//
// Returns a non-nil error if an error occurs making the API call, the
// API responds with a non-successful HTTP status, or an error occurs
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartKeep(2, false)
- RetryDelay = 0
+ RetryDelay = 2 * time.Second
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
type MockArvadosServerSuite struct{}
func (s *MockArvadosServerSuite) SetUpSuite(c *C) {
- RetryDelay = 0
+ RetryDelay = 100 * time.Millisecond
}
func (s *MockArvadosServerSuite) SetUpTest(c *C) {
}
func (h *APIStub) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if req.URL.Path == "/redirect-loop" {
- http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
- return
- }
- if h.respStatus[h.retryAttempts] < 0 {
- // Fail the client's Do() by starting a redirect loop
- http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+ if status := h.respStatus[h.retryAttempts]; status < 0 {
+ // Fail the client's Do() by hanging up without
+ // sending an HTTP response header.
+ conn, _, err := resp.(http.Hijacker).Hijack()
+ if err != nil {
+ panic(err)
+ }
+ conn.Write([]byte("zzzzzzzzzz"))
+ conn.Close()
} else {
- resp.WriteHeader(h.respStatus[h.retryAttempts])
+ resp.WriteHeader(status)
resp.Write([]byte(h.responseBody[h.retryAttempts]))
}
h.retryAttempts++
"create", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
},
{
- "get", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "get", 0, 423, []int{500, 500, 423, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "create", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "create", 0, 423, []int{500, 500, 423, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "update", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "update", 0, 422, []int{500, 500, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "delete", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "delete", 0, 422, []int{500, 500, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "get", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "get", 0, 401, []int{500, 502, 401, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "create", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "create", 0, 422, []int{500, 502, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
"get", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
{
"create", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
},
+ {
+ "create", 0, 403, []int{403, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 422, []int{422, 200}, []string{``, `{"ok":"ok"}`},
+ },
{
"get", 0, 404, []int{404, 200}, []string{``, `{"ok":"ok"}`},
},
{
"get", 0, 200, []int{-1, -1, 200}, []string{``, ``, `{"ok":"ok"}`},
},
- // "POST" is not safe to retry: fail after one error
+ // "POST" protocol error is safe to retry
{
- "create", 0, -1, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
+ "create", 0, 200, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
},
} {
+ c.Logf("stub: %#v", stub)
+
api, err := RunFakeArvadosServer(&stub)
c.Check(err, IsNil)
default:
c.Check(err, NotNil)
c.Check(err, ErrorMatches, fmt.Sprintf("arvados API server error: %d.*", stub.expected))
- c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+ if c.Check(err, FitsTypeOf, APIServerError{}) {
+ c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+ }
}
}
}