Merge branch '16833-replace-epydoc' into master
[arvados.git] / lib / recovercollection / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package recovercollection
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/sirupsen/logrus"
22 )
23
24 var Command command
25
26 type command struct{}
27
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         logger := ctxlog.New(stderr, "text", "info")
31         defer func() {
32                 if err != nil {
33                         logger.WithError(err).Error("fatal")
34                 }
35                 logger.Info("exiting")
36         }()
37
38         loader := config.NewLoader(stdin, logger)
39         loader.SkipLegacy = true
40
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         flags.Usage = func() {
44                 fmt.Fprintf(flags.Output(), `Usage:
45         %s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]
46
47         This program recovers deleted collections. Recovery is
48         possible when the collection's manifest is still available and
49         all of its data blocks are still available or recoverable
50         (e.g., garbage collection is not enabled, the blocks are too
51         new for garbage collection, the blocks are referenced by other
52         collections, or the blocks have been trashed but not yet
53         deleted).
54
55         There are multiple ways to specify a collection to recover:
56
57         * Path to a local file containing a manifest with the desired
58           data
59
60         * UUID of an Arvados log entry, typically a "delete" or
61           "update" event, whose "old attributes" have a manifest with
62           the desired data
63
64         * UUID of an Arvados collection whose most recent log entry,
65           typically a "delete" or "update" event, has the desired
66           data in its "old attributes"
67
68         For each provided collection manifest, once all data blocks
69         are recovered/protected from garbage collection, a new
70         collection is saved and its UUID is printed on stdout.
71
72         Restored collections will belong to the system (root) user.
73
74         Exit status will be zero if recovery is successful, i.e., a
75         collection is saved for each provided manifest.
76 Options:
77 `, prog)
78                 flags.PrintDefaults()
79         }
80         loader.SetupFlags(flags)
81         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
82         err = flags.Parse(args)
83         if err == flag.ErrHelp {
84                 err = nil
85                 return 0
86         } else if err != nil {
87                 return 2
88         }
89
90         if len(flags.Args()) == 0 {
91                 flags.Usage()
92                 return 2
93         }
94
95         lvl, err := logrus.ParseLevel(*loglevel)
96         if err != nil {
97                 return 2
98         }
99         logger.SetLevel(lvl)
100
101         cfg, err := loader.Load()
102         if err != nil {
103                 return 1
104         }
105         cluster, err := cfg.GetCluster("")
106         if err != nil {
107                 return 1
108         }
109         client, err := arvados.NewClientFromConfig(cluster)
110         if err != nil {
111                 return 1
112         }
113         client.AuthToken = cluster.SystemRootToken
114         rcvr := recoverer{
115                 client:  client,
116                 cluster: cluster,
117                 logger:  logger,
118         }
119
120         exitcode := 0
121         for _, src := range flags.Args() {
122                 logger := logger.WithField("src", src)
123                 var mtxt string
124                 if !strings.Contains(src, "/") && len(src) == 27 && src[5] == '-' && src[11] == '-' {
125                         var filters []arvados.Filter
126                         if src[5:12] == "-57u5n-" {
127                                 filters = []arvados.Filter{{"uuid", "=", src}}
128                         } else if src[5:12] == "-4zz18-" {
129                                 filters = []arvados.Filter{{"object_uuid", "=", src}}
130                         } else {
131                                 logger.Error("looks like a UUID but not a log or collection UUID (if it's really a file, prepend './')")
132                                 exitcode = 1
133                                 continue
134                         }
135                         var resp struct {
136                                 Items []struct {
137                                         UUID       string    `json:"uuid"`
138                                         EventType  string    `json:"event_type"`
139                                         EventAt    time.Time `json:"event_at"`
140                                         ObjectUUID string    `json:"object_uuid"`
141                                         Properties struct {
142                                                 OldAttributes struct {
143                                                         ManifestText string `json:"manifest_text"`
144                                                 } `json:"old_attributes"`
145                                         } `json:"properties"`
146                                 }
147                         }
148                         err = client.RequestAndDecode(&resp, "GET", "arvados/v1/logs", nil, arvados.ListOptions{
149                                 Limit:   1,
150                                 Order:   []string{"event_at desc"},
151                                 Filters: filters,
152                         })
153                         if err != nil {
154                                 logger.WithError(err).Error("error looking up log entry")
155                                 exitcode = 1
156                                 continue
157                         } else if len(resp.Items) == 0 {
158                                 logger.Error("log entry not found")
159                                 exitcode = 1
160                                 continue
161                         }
162                         logent := resp.Items[0]
163                         logger.WithFields(logrus.Fields{
164                                 "uuid":                logent.UUID,
165                                 "old_collection_uuid": logent.ObjectUUID,
166                                 "logged_event_type":   logent.EventType,
167                                 "logged_event_time":   logent.EventAt,
168                                 "logged_object_uuid":  logent.ObjectUUID,
169                         }).Info("loaded log entry")
170                         mtxt = logent.Properties.OldAttributes.ManifestText
171                         if mtxt == "" {
172                                 logger.Error("log entry properties.old_attributes.manifest_text missing or empty")
173                                 exitcode = 1
174                                 continue
175                         }
176                 } else {
177                         buf, err := ioutil.ReadFile(src)
178                         if err != nil {
179                                 logger.WithError(err).Error("failed to load manifest data from file")
180                                 exitcode = 1
181                                 continue
182                         }
183                         mtxt = string(buf)
184                 }
185                 uuid, err := rcvr.RecoverManifest(string(mtxt))
186                 if err != nil {
187                         logger.WithError(err).Error("recovery failed")
188                         exitcode = 1
189                         continue
190                 }
191                 logger.WithField("UUID", uuid).Info("recovery succeeded")
192                 fmt.Fprintln(stdout, uuid)
193         }
194         return exitcode
195 }
196
197 type recoverer struct {
198         client  *arvados.Client
199         cluster *arvados.Cluster
200         logger  logrus.FieldLogger
201 }
202
203 var errNotFound = errors.New("not found")
204
205 // Finds the timestamp of the newest copy of blk on svc. Returns
206 // errNotFound if blk is not on svc at all.
207 func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
208         found, err := svc.Index(ctx, rcvr.client, blk)
209         if err != nil {
210                 logger.WithError(err).Warn("error getting index")
211                 return time.Time{}, err
212         } else if len(found) == 0 {
213                 return time.Time{}, errNotFound
214         }
215         var latest time.Time
216         for _, ent := range found {
217                 t := time.Unix(0, ent.Mtime)
218                 if t.After(latest) {
219                         latest = t
220                 }
221         }
222         logger.WithField("latest", latest).Debug("found")
223         return latest, nil
224 }
225
226 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
227
228 // Ensures the given block exists on the given server and won't be
229 // eligible for trashing until after our chosen deadline (blobsigexp).
230 // Returns an error if the block doesn't exist on the given server, or
231 // has an old timestamp and can't be updated.
232 //
233 // After we decide a block is "safe" (whether or not we had to untrash
234 // it), keep-balance might notice that it's currently unreferenced and
235 // decide to trash it, all before our recovered collection gets
236 // saved. But if the block's timestamp is more recent than blobsigttl,
237 // keepstore will refuse to trash it even if told to by keep-balance.
238 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
239         if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
240                 return err
241         } else if latest.Add(blobsigttl).After(blobsigexp) {
242                 return nil
243         }
244         if err := svc.Touch(ctx, rcvr.client, blk); err != nil {
245                 return fmt.Errorf("error updating timestamp: %s", err)
246         }
247         logger.Debug("updated timestamp")
248         if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
249                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
250         } else if err != nil {
251                 return err
252         } else if latest.Add(blobsigttl).After(blobsigexp) {
253                 return nil
254         } else {
255                 return errTouchIneffective
256         }
257 }
258
259 // Untrash and update GC timestamps (as needed) on blocks referenced
260 // by the given manifest, save a new collection and return the new
261 // collection's UUID.
262 func (rcvr recoverer) RecoverManifest(mtxt string) (string, error) {
263         ctx, cancel := context.WithCancel(context.Background())
264         defer cancel()
265
266         coll := arvados.Collection{ManifestText: mtxt}
267         blks, err := coll.SizedDigests()
268         if err != nil {
269                 return "", err
270         }
271         todo := make(chan int, len(blks))
272         for idx := range blks {
273                 todo <- idx
274         }
275         go close(todo)
276
277         var services []arvados.KeepService
278         err = rcvr.client.EachKeepService(func(svc arvados.KeepService) error {
279                 if svc.ServiceType == "proxy" {
280                         rcvr.logger.WithField("service", svc).Debug("ignore proxy service")
281                 } else {
282                         services = append(services, svc)
283                 }
284                 return nil
285         })
286         if err != nil {
287                 return "", fmt.Errorf("error getting list of keep services: %s", err)
288         }
289         rcvr.logger.WithField("services", services).Debug("got list of services")
290
291         // blobsigexp is our deadline for saving the rescued
292         // collection. This must be less than BlobSigningTTL
293         // (otherwise our rescued blocks could be garbage collected
294         // again before we protect them by saving the collection) but
295         // the exact value is somewhat arbitrary. If it's too soon, it
296         // will arrive before we're ready to save, and save will
297         // fail. If it's too late, we'll needlessly update timestamps
298         // on some blocks that were recently written/touched (e.g., by
299         // a previous attempt to rescue this same collection) and
300         // would have lived long enough anyway if left alone.
301         // BlobSigningTTL/2 (typically around 1 week) is much longer
302         // than than we need to recover even a very large collection.
303         blobsigttl := rcvr.cluster.Collections.BlobSigningTTL.Duration()
304         blobsigexp := time.Now().Add(blobsigttl / 2)
305         rcvr.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
306
307         // We'll start a number of threads, each working on
308         // checking/recovering one block at a time. The threads
309         // themselves don't need much CPU/memory, but to avoid hitting
310         // limits on keepstore connections, backend storage bandwidth,
311         // etc., we limit concurrency to 2 per keepstore node.
312         workerThreads := 2 * len(services)
313
314         blkFound := make([]bool, len(blks))
315         var wg sync.WaitGroup
316         for i := 0; i < workerThreads; i++ {
317                 wg.Add(1)
318                 go func() {
319                         defer wg.Done()
320                 nextblk:
321                         for idx := range todo {
322                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
323                                 logger := rcvr.logger.WithField("block", blk)
324                                 for _, untrashing := range []bool{false, true} {
325                                         for _, svc := range services {
326                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
327                                                 if untrashing {
328                                                         if err := svc.Untrash(ctx, rcvr.client, blk); err != nil {
329                                                                 logger.WithError(err).Debug("untrash failed")
330                                                                 continue
331                                                         }
332                                                         logger.Info("untrashed")
333                                                 }
334                                                 err := rcvr.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
335                                                 if err == errNotFound {
336                                                         logger.Debug(err)
337                                                 } else if err != nil {
338                                                         logger.Error(err)
339                                                 } else {
340                                                         blkFound[idx] = true
341                                                         continue nextblk
342                                                 }
343                                         }
344                                 }
345                                 logger.Debug("unrecoverable")
346                         }
347                 }()
348         }
349         wg.Wait()
350
351         var have, havenot int
352         for _, ok := range blkFound {
353                 if ok {
354                         have++
355                 } else {
356                         havenot++
357                 }
358         }
359         if havenot > 0 {
360                 if have > 0 {
361                         rcvr.logger.Warn("partial recovery is not implemented")
362                 }
363                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
364         }
365
366         if rcvr.cluster.Collections.BlobSigning {
367                 key := []byte(rcvr.cluster.Collections.BlobSigningKey)
368                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, rcvr.client.AuthToken, blobsigexp, blobsigttl, key)
369         }
370         rcvr.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
371         err = rcvr.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
372                 "collection": map[string]interface{}{
373                         "manifest_text": coll.ManifestText,
374                 },
375         })
376         if err != nil {
377                 return "", fmt.Errorf("error saving new collection: %s", err)
378         }
379         rcvr.logger.WithField("UUID", coll.UUID).Debug("created new collection")
380         return coll.UUID, nil
381 }