2960: Finish renaming s3aws_volume to s3_volume.
[arvados.git] / services / keepstore / trash_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "crypto/md5"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "sort"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         . "gopkg.in/check.v1"
18 )
19
20 func (s *routerSuite) TestTrashList_Clear(c *C) {
21         s.cluster.Collections.BlobTrash = false
22         router, cancel := testRouter(c, s.cluster, nil)
23         defer cancel()
24
25         resp := call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte(`
26                 [
27                  {
28                   "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
29                   "block_mtime":1707249451308502672,
30                   "mount_uuid":"zzzzz-nyw5e-000000000000000"
31                  }
32                 ]
33                 `), nil)
34         c.Check(resp.Code, Equals, http.StatusOK)
35         c.Check(router.trasher.todo, DeepEquals, []TrashListItem{{
36                 Locator:    "acbd18db4cc2f85cedef654fccc4a4d8+3",
37                 BlockMtime: 1707249451308502672,
38                 MountUUID:  "zzzzz-nyw5e-000000000000000",
39         }})
40
41         resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte("[]"), nil)
42         c.Check(resp.Code, Equals, http.StatusOK)
43         c.Check(router.trasher.todo, HasLen, 0)
44 }
45
46 func (s *routerSuite) TestTrashList_Execute(c *C) {
47         s.cluster.Collections.BlobTrashConcurrency = 1
48         s.cluster.Volumes = map[string]arvados.Volume{
49                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
50                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
51                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
52                 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
53         }
54         router, cancel := testRouter(c, s.cluster, nil)
55         defer cancel()
56
57         var mounts []struct {
58                 UUID     string
59                 DeviceID string `json:"device_id"`
60         }
61         resp := call(router, "GET", "http://example/mounts", s.cluster.SystemRootToken, nil, nil)
62         c.Check(resp.Code, Equals, http.StatusOK)
63         err := json.Unmarshal(resp.Body.Bytes(), &mounts)
64         c.Assert(err, IsNil)
65         c.Assert(mounts, HasLen, 4)
66
67         // Sort mounts by UUID
68         sort.Slice(mounts, func(i, j int) bool {
69                 return mounts[i].UUID < mounts[j].UUID
70         })
71
72         // Make vols (stub volumes) in same order as mounts
73         var vols []*stubVolume
74         for _, mount := range mounts {
75                 vols = append(vols, router.keepstore.mounts[mount.UUID].volume.(*stubVolume))
76         }
77
78         // The "trial" loop below will construct the trashList which
79         // we'll send to trasher via router, plus a slice of checks
80         // which we'll run after the trasher has finished executing
81         // the list.
82         var trashList []TrashListItem
83         var checks []func()
84
85         tNew := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() / 2)
86         tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
87
88         for _, trial := range []struct {
89                 comment        string
90                 storeMtime     []time.Time
91                 trashListItems []TrashListItem
92                 expectData     []bool
93         }{
94                 {
95                         comment:    "timestamp matches, but is not old enough to trash => skip",
96                         storeMtime: []time.Time{tNew},
97                         trashListItems: []TrashListItem{
98                                 {
99                                         BlockMtime: tNew.UnixNano(),
100                                         MountUUID:  mounts[0].UUID,
101                                 },
102                         },
103                         expectData: []bool{true},
104                 },
105                 {
106                         comment:    "timestamp matches, and is old enough => trash",
107                         storeMtime: []time.Time{tOld},
108                         trashListItems: []TrashListItem{
109                                 {
110                                         BlockMtime: tOld.UnixNano(),
111                                         MountUUID:  mounts[0].UUID,
112                                 },
113                         },
114                         expectData: []bool{false},
115                 },
116                 {
117                         comment:    "timestamp matches and is old enough on mount 0, but the request specifies mount 1, where timestamp does not match => skip",
118                         storeMtime: []time.Time{tOld, tOld.Add(-time.Second)},
119                         trashListItems: []TrashListItem{
120                                 {
121                                         BlockMtime: tOld.UnixNano(),
122                                         MountUUID:  mounts[1].UUID,
123                                 },
124                         },
125                         expectData: []bool{true, true},
126                 },
127                 {
128                         comment:    "MountUUID unspecified => trash from any mount where timestamp matches, leave alone elsewhere",
129                         storeMtime: []time.Time{tOld, tOld.Add(-time.Second)},
130                         trashListItems: []TrashListItem{
131                                 {
132                                         BlockMtime: tOld.UnixNano(),
133                                 },
134                         },
135                         expectData: []bool{false, true},
136                 },
137                 {
138                         comment:    "MountUUID unspecified => trash from multiple mounts if timestamp matches, but skip readonly volumes unless AllowTrashWhenReadOnly",
139                         storeMtime: []time.Time{tOld, tOld, tOld, tOld},
140                         trashListItems: []TrashListItem{
141                                 {
142                                         BlockMtime: tOld.UnixNano(),
143                                 },
144                         },
145                         expectData: []bool{false, false, true, false},
146                 },
147                 {
148                         comment:    "readonly MountUUID specified => skip",
149                         storeMtime: []time.Time{tOld, tOld, tOld},
150                         trashListItems: []TrashListItem{
151                                 {
152                                         BlockMtime: tOld.UnixNano(),
153                                         MountUUID:  mounts[2].UUID,
154                                 },
155                         },
156                         expectData: []bool{true, true, true},
157                 },
158         } {
159                 trial := trial
160                 data := []byte(fmt.Sprintf("trial %+v", trial))
161                 hash := fmt.Sprintf("%x", md5.Sum(data))
162                 for i, t := range trial.storeMtime {
163                         if t.IsZero() {
164                                 continue
165                         }
166                         err := vols[i].BlockWrite(context.Background(), hash, data)
167                         c.Assert(err, IsNil)
168                         err = vols[i].blockTouchWithTime(hash, t)
169                         c.Assert(err, IsNil)
170                 }
171                 for _, item := range trial.trashListItems {
172                         item.Locator = fmt.Sprintf("%s+%d", hash, len(data))
173                         trashList = append(trashList, item)
174                 }
175                 for i, expect := range trial.expectData {
176                         i, expect := i, expect
177                         checks = append(checks, func() {
178                                 ent := vols[i].data[hash]
179                                 dataPresent := ent.data != nil && ent.trash.IsZero()
180                                 c.Check(dataPresent, Equals, expect, Commentf("%s mount %d (%s) expect present=%v but got len(ent.data)=%d ent.trash=%v // %s\nlog:\n%s", hash, i, vols[i].params.UUID, expect, len(ent.data), !ent.trash.IsZero(), trial.comment, vols[i].stubLog.String()))
181                         })
182                 }
183         }
184
185         listjson, err := json.Marshal(trashList)
186         resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, listjson, nil)
187         c.Check(resp.Code, Equals, http.StatusOK)
188
189         for {
190                 router.trasher.cond.L.Lock()
191                 todolen := len(router.trasher.todo)
192                 router.trasher.cond.L.Unlock()
193                 if todolen == 0 && router.trasher.inprogress.Load() == 0 {
194                         break
195                 }
196                 time.Sleep(time.Millisecond)
197         }
198
199         for _, check := range checks {
200                 check()
201         }
202 }