// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package keepstore import ( "context" "crypto/md5" "encoding/json" "fmt" "net/http" "sort" "time" "git.arvados.org/arvados.git/sdk/go/arvados" . "gopkg.in/check.v1" ) func (s *routerSuite) TestTrashList_Clear(c *C) { s.cluster.Collections.BlobTrash = false router, cancel := testRouter(c, s.cluster, nil) defer cancel() resp := call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte(` [ { "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", "block_mtime":1707249451308502672, "mount_uuid":"zzzzz-nyw5e-000000000000000" } ] `), nil) c.Check(resp.Code, Equals, http.StatusOK) c.Check(router.trasher.todo, DeepEquals, []TrashListItem{{ Locator: "acbd18db4cc2f85cedef654fccc4a4d8+3", BlockMtime: 1707249451308502672, MountUUID: "zzzzz-nyw5e-000000000000000", }}) resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte("[]"), nil) c.Check(resp.Code, Equals, http.StatusOK) c.Check(router.trasher.todo, HasLen, 0) } func (s *routerSuite) TestTrashList_Execute(c *C) { s.cluster.Collections.BlobTrashConcurrency = 1 s.cluster.Volumes = map[string]arvados.Volume{ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"}, "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true}, "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true}, } router, cancel := testRouter(c, s.cluster, nil) defer cancel() var mounts []struct { UUID string DeviceID string `json:"device_id"` } resp := call(router, "GET", "http://example/mounts", s.cluster.SystemRootToken, nil, nil) c.Check(resp.Code, Equals, http.StatusOK) err := json.Unmarshal(resp.Body.Bytes(), &mounts) c.Assert(err, IsNil) c.Assert(mounts, HasLen, 4) // Sort mounts by UUID sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) // Make vols (stub volumes) in same order as mounts var vols []*stubVolume for _, mount := range mounts { vols = append(vols, router.keepstore.mounts[mount.UUID].volume.(*stubVolume)) } // The "trial" loop below will construct the trashList which // we'll send to trasher via router, plus a slice of checks // which we'll run after the trasher has finished executing // the list. var trashList []TrashListItem var checks []func() tNew := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() / 2) tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second) for _, trial := range []struct { comment string storeMtime []time.Time trashListItems []TrashListItem expectData []bool }{ { comment: "timestamp matches, but is not old enough to trash => skip", storeMtime: []time.Time{tNew}, trashListItems: []TrashListItem{ { BlockMtime: tNew.UnixNano(), MountUUID: mounts[0].UUID, }, }, expectData: []bool{true}, }, { comment: "timestamp matches, and is old enough => trash", storeMtime: []time.Time{tOld}, trashListItems: []TrashListItem{ { BlockMtime: tOld.UnixNano(), MountUUID: mounts[0].UUID, }, }, expectData: []bool{false}, }, { comment: "timestamp matches and is old enough on mount 0, but the request specifies mount 1, where timestamp does not match => skip", storeMtime: []time.Time{tOld, tOld.Add(-time.Second)}, trashListItems: []TrashListItem{ { BlockMtime: tOld.UnixNano(), MountUUID: mounts[1].UUID, }, }, expectData: []bool{true, true}, }, { comment: "MountUUID unspecified => trash from any mount where timestamp matches, leave alone elsewhere", storeMtime: []time.Time{tOld, tOld.Add(-time.Second)}, trashListItems: []TrashListItem{ { BlockMtime: tOld.UnixNano(), }, }, expectData: []bool{false, true}, }, { comment: "MountUUID unspecified => trash from multiple mounts if timestamp matches, but skip readonly volumes unless AllowTrashWhenReadOnly", storeMtime: []time.Time{tOld, tOld, tOld, tOld}, trashListItems: []TrashListItem{ { BlockMtime: tOld.UnixNano(), }, }, expectData: []bool{false, false, true, false}, }, { comment: "readonly MountUUID specified => skip", storeMtime: []time.Time{tOld, tOld, tOld}, trashListItems: []TrashListItem{ { BlockMtime: tOld.UnixNano(), MountUUID: mounts[2].UUID, }, }, expectData: []bool{true, true, true}, }, } { trial := trial data := []byte(fmt.Sprintf("trial %+v", trial)) hash := fmt.Sprintf("%x", md5.Sum(data)) for i, t := range trial.storeMtime { if t.IsZero() { continue } err := vols[i].BlockWrite(context.Background(), hash, data) c.Assert(err, IsNil) err = vols[i].blockTouchWithTime(hash, t) c.Assert(err, IsNil) } for _, item := range trial.trashListItems { item.Locator = fmt.Sprintf("%s+%d", hash, len(data)) trashList = append(trashList, item) } for i, expect := range trial.expectData { i, expect := i, expect checks = append(checks, func() { ent := vols[i].data[hash] dataPresent := ent.data != nil && ent.trash.IsZero() c.Check(dataPresent, Equals, expect, Commentf("%s mount %d (%s) expect present=%v but got len(ent.data)=%d ent.trash=%v // %s\nlog:\n%s", hash, i, vols[i].params.UUID, expect, len(ent.data), !ent.trash.IsZero(), trial.comment, vols[i].stubLog.String())) }) } } listjson, err := json.Marshal(trashList) resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, listjson, nil) c.Check(resp.Code, Equals, http.StatusOK) for { router.trasher.cond.L.Lock() todolen := len(router.trasher.todo) router.trasher.cond.L.Unlock() if todolen == 0 && router.trasher.inprogress.Load() == 0 { break } time.Sleep(time.Millisecond) } for _, check := range checks { check() } }