16427: Add "undeleting collections" doc page in admin section.
[arvados.git] / lib / undelete / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package undelete
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/sirupsen/logrus"
22 )
23
24 var Command command
25
26 type command struct{}
27
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         logger := ctxlog.New(stderr, "text", "info")
31         defer func() {
32                 if err != nil {
33                         logger.WithError(err).Error("fatal")
34                 }
35                 logger.Info("exiting")
36         }()
37
38         loader := config.NewLoader(stdin, logger)
39         loader.SkipLegacy = true
40
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         flags.Usage = func() {
44                 fmt.Fprintf(flags.Output(), `Usage:
45         %s [options ...] { /path/to/manifest.txt | log-entry-uuid } [...]
46
47         This program recovers deleted collections. Recovery is
48         possible when the collection's manifest is still available and
49         all of its data blocks are still available or recoverable
50         (e.g., garbage collection is not enabled, the blocks are too
51         new for garbage collection, the blocks are referenced by other
52         collections, or the blocks have been trashed but not yet
53         deleted).
54
55         Collections can be specified either by filename (a local file
56         containing a manifest with the desired data) or by log UUID
57         (an Arvados log entry, typically a "delete" or "update" event,
58         whose "old attributes" have a manifest with the desired data).
59
60         For each provided collection manifest, once all data blocks
61         are recovered/protected from garbage collection, a new
62         collection is saved and its UUID is printed on stdout.
63
64         Restored collections will belong to the system (root) user.
65
66         Exit status will be zero if recovery is successful, i.e., a
67         collection is saved for each provided manifest.
68 Options:
69 `, prog)
70                 flags.PrintDefaults()
71         }
72         loader.SetupFlags(flags)
73         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
74         err = flags.Parse(args)
75         if err == flag.ErrHelp {
76                 err = nil
77                 return 0
78         } else if err != nil {
79                 return 2
80         }
81
82         if len(flags.Args()) == 0 {
83                 flags.Usage()
84                 return 2
85         }
86
87         lvl, err := logrus.ParseLevel(*loglevel)
88         if err != nil {
89                 return 2
90         }
91         logger.SetLevel(lvl)
92
93         cfg, err := loader.Load()
94         if err != nil {
95                 return 1
96         }
97         cluster, err := cfg.GetCluster("")
98         if err != nil {
99                 return 1
100         }
101         client, err := arvados.NewClientFromConfig(cluster)
102         if err != nil {
103                 return 1
104         }
105         client.AuthToken = cluster.SystemRootToken
106         und := undeleter{
107                 client:  client,
108                 cluster: cluster,
109                 logger:  logger,
110         }
111
112         exitcode := 0
113         for _, src := range flags.Args() {
114                 logger := logger.WithField("src", src)
115                 var mtxt string
116                 if len(src) == 27 && src[5:12] == "-57u5n-" {
117                         var logent struct {
118                                 EventType  string    `json:"event_type"`
119                                 EventAt    time.Time `json:"event_at"`
120                                 ObjectUUID string    `json:"object_uuid"`
121                                 Properties struct {
122                                         OldAttributes struct {
123                                                 ManifestText string `json:"manifest_text"`
124                                         } `json:"old_attributes"`
125                                 } `json:"properties"`
126                         }
127                         err = client.RequestAndDecode(&logent, "GET", "arvados/v1/logs/"+src, nil, nil)
128                         if err != nil {
129                                 logger.WithError(err).Error("failed to load log entry")
130                                 exitcode = 1
131                                 continue
132                         }
133                         logger.WithFields(logrus.Fields{
134                                 "old_collection_uuid": logent.ObjectUUID,
135                                 "logged_event_type":   logent.EventType,
136                                 "logged_event_time":   logent.EventAt,
137                         }).Info("loaded log entry")
138                         mtxt = logent.Properties.OldAttributes.ManifestText
139                 } else {
140                         buf, err := ioutil.ReadFile(src)
141                         if err != nil {
142                                 logger.WithError(err).Error("failed to load manifest data from file")
143                                 exitcode = 1
144                                 continue
145                         }
146                         mtxt = string(buf)
147                 }
148                 uuid, err := und.RecoverManifest(string(mtxt))
149                 if err != nil {
150                         logger.WithError(err).Error("recovery failed")
151                         exitcode = 1
152                         continue
153                 }
154                 logger.WithField("UUID", uuid).Info("recovery succeeded")
155                 fmt.Fprintln(stdout, uuid)
156         }
157         return exitcode
158 }
159
160 type undeleter struct {
161         client  *arvados.Client
162         cluster *arvados.Cluster
163         logger  logrus.FieldLogger
164 }
165
166 var errNotFound = errors.New("not found")
167
168 // Finds the timestamp of the newest copy of blk on svc. Returns
169 // errNotFound if blk is not on svc at all.
170 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
171         found, err := svc.Index(und.client, blk)
172         if err != nil {
173                 logger.WithError(err).Warn("error getting index")
174                 return time.Time{}, err
175         } else if len(found) == 0 {
176                 return time.Time{}, errNotFound
177         }
178         var latest time.Time
179         for _, ent := range found {
180                 t := time.Unix(0, ent.Mtime)
181                 if t.After(latest) {
182                         latest = t
183                 }
184         }
185         logger.WithField("latest", latest).Debug("found")
186         return latest, nil
187 }
188
189 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
190
191 // Ensures the given block exists on the given server and won't be
192 // eligible for trashing until after our chosen deadline (blobsigexp).
193 // Returns an error if the block doesn't exist on the given server, or
194 // has an old timestamp and can't be updated.
195 //
196 // After we decide a block is "safe" (whether or not we had to untrash
197 // it), keep-balance might notice that it's currently unreferenced and
198 // decide to trash it, all before our recovered collection gets
199 // saved. But if the block's timestamp is more recent than blobsigttl,
200 // keepstore will refuse to trash it even if told to by keep-balance.
201 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
202         if latest, err := und.newestMtime(logger, blk, svc); err != nil {
203                 return err
204         } else if latest.Add(blobsigttl).After(blobsigexp) {
205                 return nil
206         }
207         if err := svc.Touch(ctx, und.client, blk); err != nil {
208                 return fmt.Errorf("error updating timestamp: %s", err)
209         }
210         logger.Debug("updated timestamp")
211         if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
212                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
213         } else if err != nil {
214                 return err
215         } else if latest.Add(blobsigttl).After(blobsigexp) {
216                 return nil
217         } else {
218                 return errTouchIneffective
219         }
220 }
221
222 // Untrash and update GC timestamps (as needed) on blocks referenced
223 // by the given manifest, save a new collection and return the new
224 // collection's UUID.
225 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
226         ctx, cancel := context.WithCancel(context.Background())
227         defer cancel()
228
229         coll := arvados.Collection{ManifestText: mtxt}
230         blks, err := coll.SizedDigests()
231         if err != nil {
232                 return "", err
233         }
234         todo := make(chan int, len(blks))
235         for idx := range blks {
236                 todo <- idx
237         }
238         go close(todo)
239
240         var services []arvados.KeepService
241         err = und.client.EachKeepService(func(svc arvados.KeepService) error {
242                 if svc.ServiceType == "proxy" {
243                         und.logger.WithField("service", svc).Debug("ignore proxy service")
244                 } else {
245                         services = append(services, svc)
246                 }
247                 return nil
248         })
249         if err != nil {
250                 return "", fmt.Errorf("error getting list of keep services: %s", err)
251         }
252         und.logger.WithField("services", services).Debug("got list of services")
253
254         // blobsigexp is our deadline for saving the rescued
255         // collection. This must be less than BlobSigningTTL
256         // (otherwise our rescued blocks could be garbage collected
257         // again before we protect them by saving the collection) but
258         // the exact value is somewhat arbitrary. If it's too soon, it
259         // will arrive before we're ready to save, and save will
260         // fail. If it's too late, we'll needlessly update timestamps
261         // on some blocks that were recently written/touched (e.g., by
262         // a previous attempt to rescue this same collection) and
263         // would have lived long enough anyway if left alone.
264         // BlobSigningTTL/2 (typically around 1 week) is much longer
265         // than than we need to recover even a very large collection.
266         blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
267         blobsigexp := time.Now().Add(blobsigttl / 2)
268         und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
269
270         // We'll start a number of threads, each working on
271         // checking/recovering one block at a time. The threads
272         // themselves don't need much CPU/memory, but to avoid hitting
273         // limits on keepstore connections, backend storage bandwidth,
274         // etc., we limit concurrency to 2 per keepstore node.
275         workerThreads := 2 * len(services)
276
277         blkFound := make([]bool, len(blks))
278         var wg sync.WaitGroup
279         for i := 0; i < workerThreads; i++ {
280                 wg.Add(1)
281                 go func() {
282                         defer wg.Done()
283                 nextblk:
284                         for idx := range todo {
285                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
286                                 logger := und.logger.WithField("block", blk)
287                                 for _, untrashing := range []bool{false, true} {
288                                         for _, svc := range services {
289                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
290                                                 if untrashing {
291                                                         if err := svc.Untrash(ctx, und.client, blk); err != nil {
292                                                                 logger.WithError(err).Debug("untrash failed")
293                                                                 continue
294                                                         }
295                                                         logger.Info("untrashed")
296                                                 }
297                                                 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
298                                                 if err == errNotFound {
299                                                         logger.Debug(err)
300                                                 } else if err != nil {
301                                                         logger.Error(err)
302                                                 } else {
303                                                         blkFound[idx] = true
304                                                         continue nextblk
305                                                 }
306                                         }
307                                 }
308                                 logger.Debug("unrecoverable")
309                         }
310                 }()
311         }
312         wg.Wait()
313
314         var have, havenot int
315         for _, ok := range blkFound {
316                 if ok {
317                         have++
318                 } else {
319                         havenot++
320                 }
321         }
322         if havenot > 0 {
323                 if have > 0 {
324                         und.logger.Warn("partial recovery is not implemented")
325                 }
326                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
327         }
328
329         if und.cluster.Collections.BlobSigning {
330                 key := []byte(und.cluster.Collections.BlobSigningKey)
331                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
332         }
333         und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
334         err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
335                 "collection": map[string]interface{}{
336                         "manifest_text": coll.ManifestText,
337                 },
338         })
339         if err != nil {
340                 return "", fmt.Errorf("error saving new collection: %s", err)
341         }
342         und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
343         return coll.UUID, nil
344 }