// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package keepstore import ( "bytes" "context" "crypto/md5" "encoding/json" "errors" "fmt" "io" "net/http" "net/http/httptest" "sort" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" "github.com/sirupsen/logrus" . "gopkg.in/check.v1" ) func (s *routerSuite) TestPullList_Execute(c *C) { remotecluster := testCluster(c) remotecluster.Volumes = map[string]arvados.Volume{ "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"}, } remoterouter, cancel := testRouter(c, remotecluster, nil) defer cancel() remoteserver := httptest.NewServer(remoterouter) defer remoteserver.Close() router, cancel := testRouter(c, s.cluster, nil) defer cancel() executePullList := func(pullList []PullListItem) string { var logbuf bytes.Buffer logger := logrus.New() logger.Out = &logbuf router.keepstore.logger = logger listjson, err := json.Marshal(pullList) c.Assert(err, IsNil) resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil) c.Check(resp.Code, Equals, http.StatusOK) for { router.puller.cond.L.Lock() todolen := len(router.puller.todo) router.puller.cond.L.Unlock() if todolen == 0 && router.puller.inprogress.Load() == 0 { break } time.Sleep(time.Millisecond) } return logbuf.String() } newRemoteBlock := func(datastring string) string { data := []byte(datastring) hash := fmt.Sprintf("%x", md5.Sum(data)) locator := fmt.Sprintf("%s+%d", hash, len(data)) _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{ Hash: hash, Data: data, }) c.Assert(err, IsNil) return locator } mounts := append([]*mount(nil), router.keepstore.mountsR...) sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) var vols []*stubVolume for _, mount := range mounts { vols = append(vols, mount.volume.(*stubVolume)) } ctx := authContext(arvadostest.ActiveTokenV2) locator := newRemoteBlock("pull available block to unspecified volume") executePullList([]PullListItem{{ Locator: locator, Servers: []string{remoteserver.URL}}}) _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{ Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator), WriteTo: io.Discard}) c.Check(err, IsNil) locator0 := newRemoteBlock("pull available block to specified volume 0") locator1 := newRemoteBlock("pull available block to specified volume 1") executePullList([]PullListItem{ { Locator: locator0, Servers: []string{remoteserver.URL}, MountUUID: vols[0].params.UUID}, { Locator: locator1, Servers: []string{remoteserver.URL}, MountUUID: vols[1].params.UUID}}) c.Check(vols[0].data[locator0[:32]].data, NotNil) c.Check(vols[1].data[locator1[:32]].data, NotNil) locator = fooHash + "+3" logs := executePullList([]PullListItem{{ Locator: locator, Servers: []string{remoteserver.URL}}}) c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n") locator = fooHash + "+3" logs = executePullList([]PullListItem{{ Locator: locator, Servers: []string{""}}}) c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n") locator = newRemoteBlock("log error writing to local volume") vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") } vols[1].blockWrite = vols[0].blockWrite logs = executePullList([]PullListItem{{ Locator: locator, Servers: []string{remoteserver.URL}}}) c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n") vols[0].blockWrite = nil vols[1].blockWrite = nil locator = newRemoteBlock("log error when destination mount does not exist") logs = executePullList([]PullListItem{{ Locator: locator, Servers: []string{remoteserver.URL}, MountUUID: "bogus-mount-uuid"}}) c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n") logs = executePullList([]PullListItem{}) c.Logf("%s", logs) }