21285: Send all SSH and tunnel endpoints to tunnel queue.
[arvados.git] / lib / recovercollection / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package recovercollection
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/lib/config"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/sirupsen/logrus"
23 )
24
25 var Command command
26
27 type command struct{}
28
29 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30         var err error
31         logger := ctxlog.New(stderr, "text", "info")
32         defer func() {
33                 if err != nil {
34                         logger.WithError(err).Error("fatal")
35                 }
36                 logger.Info("exiting")
37         }()
38
39         loader := config.NewLoader(stdin, logger)
40         loader.SkipLegacy = true
41
42         flags := flag.NewFlagSet(prog, flag.ContinueOnError)
43         flags.Usage = func() {
44                 fmt.Fprintf(flags.Output(), `Usage:
45         %s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]
46
47         This program recovers deleted collections. Recovery is
48         possible when the collection's manifest is still available and
49         all of its data blocks are still available or recoverable
50         (e.g., garbage collection is not enabled, the blocks are too
51         new for garbage collection, the blocks are referenced by other
52         collections, or the blocks have been trashed but not yet
53         deleted).
54
55         There are multiple ways to specify a collection to recover:
56
57         * Path to a local file containing a manifest with the desired
58           data
59
60         * UUID of an Arvados log entry, typically a "delete" or
61           "update" event, whose "old attributes" have a manifest with
62           the desired data
63
64         * UUID of an Arvados collection whose most recent log entry,
65           typically a "delete" or "update" event, has the desired
66           data in its "old attributes"
67
68         For each provided collection manifest, once all data blocks
69         are recovered/protected from garbage collection, a new
70         collection is saved and its UUID is printed on stdout.
71
72         Restored collections will belong to the system (root) user.
73
74         Exit status will be zero if recovery is successful, i.e., a
75         collection is saved for each provided manifest.
76 Options:
77 `, prog)
78                 flags.PrintDefaults()
79         }
80         loader.SetupFlags(flags)
81         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
82         if ok, code := cmd.ParseFlags(flags, prog, args, "source [...]", stderr); !ok {
83                 return code
84         } else if flags.NArg() == 0 {
85                 fmt.Fprintf(stderr, "missing required arguments (try -help)\n")
86                 return 2
87         }
88
89         lvl, err := logrus.ParseLevel(*loglevel)
90         if err != nil {
91                 return 2
92         }
93         logger.SetLevel(lvl)
94
95         cfg, err := loader.Load()
96         if err != nil {
97                 return 1
98         }
99         cluster, err := cfg.GetCluster("")
100         if err != nil {
101                 return 1
102         }
103         client, err := arvados.NewClientFromConfig(cluster)
104         if err != nil {
105                 return 1
106         }
107         client.AuthToken = cluster.SystemRootToken
108         rcvr := recoverer{
109                 client:  client,
110                 cluster: cluster,
111                 logger:  logger,
112         }
113
114         exitcode := 0
115         for _, src := range flags.Args() {
116                 logger := logger.WithField("src", src)
117                 var mtxt string
118                 if !strings.Contains(src, "/") && len(src) == 27 && src[5] == '-' && src[11] == '-' {
119                         var filters []arvados.Filter
120                         if src[5:12] == "-57u5n-" {
121                                 filters = []arvados.Filter{{"uuid", "=", src}}
122                         } else if src[5:12] == "-4zz18-" {
123                                 filters = []arvados.Filter{{"object_uuid", "=", src}}
124                         } else {
125                                 logger.Error("looks like a UUID but not a log or collection UUID (if it's really a file, prepend './')")
126                                 exitcode = 1
127                                 continue
128                         }
129                         var resp struct {
130                                 Items []struct {
131                                         UUID       string    `json:"uuid"`
132                                         EventType  string    `json:"event_type"`
133                                         EventAt    time.Time `json:"event_at"`
134                                         ObjectUUID string    `json:"object_uuid"`
135                                         Properties struct {
136                                                 OldAttributes struct {
137                                                         ManifestText string `json:"manifest_text"`
138                                                 } `json:"old_attributes"`
139                                         } `json:"properties"`
140                                 }
141                         }
142                         err = client.RequestAndDecode(&resp, "GET", "arvados/v1/logs", nil, arvados.ListOptions{
143                                 Limit:   1,
144                                 Order:   []string{"event_at desc"},
145                                 Filters: filters,
146                         })
147                         if err != nil {
148                                 logger.WithError(err).Error("error looking up log entry")
149                                 exitcode = 1
150                                 continue
151                         } else if len(resp.Items) == 0 {
152                                 logger.Error("log entry not found")
153                                 exitcode = 1
154                                 continue
155                         }
156                         logent := resp.Items[0]
157                         logger.WithFields(logrus.Fields{
158                                 "uuid":                logent.UUID,
159                                 "old_collection_uuid": logent.ObjectUUID,
160                                 "logged_event_type":   logent.EventType,
161                                 "logged_event_time":   logent.EventAt,
162                                 "logged_object_uuid":  logent.ObjectUUID,
163                         }).Info("loaded log entry")
164                         mtxt = logent.Properties.OldAttributes.ManifestText
165                         if mtxt == "" {
166                                 logger.Error("log entry properties.old_attributes.manifest_text missing or empty")
167                                 exitcode = 1
168                                 continue
169                         }
170                 } else {
171                         buf, err := ioutil.ReadFile(src)
172                         if err != nil {
173                                 logger.WithError(err).Error("failed to load manifest data from file")
174                                 exitcode = 1
175                                 continue
176                         }
177                         mtxt = string(buf)
178                 }
179                 uuid, err := rcvr.RecoverManifest(string(mtxt))
180                 if err != nil {
181                         logger.WithError(err).Error("recovery failed")
182                         exitcode = 1
183                         continue
184                 }
185                 logger.WithField("UUID", uuid).Info("recovery succeeded")
186                 fmt.Fprintln(stdout, uuid)
187         }
188         return exitcode
189 }
190
191 type recoverer struct {
192         client  *arvados.Client
193         cluster *arvados.Cluster
194         logger  logrus.FieldLogger
195 }
196
197 var errNotFound = errors.New("not found")
198
199 // Finds the timestamp of the newest copy of blk on svc. Returns
200 // errNotFound if blk is not on svc at all.
201 func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
202         found, err := svc.Index(ctx, rcvr.client, blk)
203         if err != nil {
204                 logger.WithError(err).Warn("error getting index")
205                 return time.Time{}, err
206         } else if len(found) == 0 {
207                 return time.Time{}, errNotFound
208         }
209         var latest time.Time
210         for _, ent := range found {
211                 t := time.Unix(0, ent.Mtime)
212                 if t.After(latest) {
213                         latest = t
214                 }
215         }
216         logger.WithField("latest", latest).Debug("found")
217         return latest, nil
218 }
219
220 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
221
222 // Ensures the given block exists on the given server and won't be
223 // eligible for trashing until after our chosen deadline (blobsigexp).
224 // Returns an error if the block doesn't exist on the given server, or
225 // has an old timestamp and can't be updated.
226 //
227 // After we decide a block is "safe" (whether or not we had to untrash
228 // it), keep-balance might notice that it's currently unreferenced and
229 // decide to trash it, all before our recovered collection gets
230 // saved. But if the block's timestamp is more recent than blobsigttl,
231 // keepstore will refuse to trash it even if told to by keep-balance.
232 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
233         if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
234                 return err
235         } else if latest.Add(blobsigttl).After(blobsigexp) {
236                 return nil
237         }
238         if err := svc.Touch(ctx, rcvr.client, blk); err != nil {
239                 return fmt.Errorf("error updating timestamp: %s", err)
240         }
241         logger.Debug("updated timestamp")
242         if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
243                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
244         } else if err != nil {
245                 return err
246         } else if latest.Add(blobsigttl).After(blobsigexp) {
247                 return nil
248         } else {
249                 return errTouchIneffective
250         }
251 }
252
253 // Untrash and update GC timestamps (as needed) on blocks referenced
254 // by the given manifest, save a new collection and return the new
255 // collection's UUID.
256 func (rcvr recoverer) RecoverManifest(mtxt string) (string, error) {
257         ctx, cancel := context.WithCancel(context.Background())
258         defer cancel()
259
260         coll := arvados.Collection{ManifestText: mtxt}
261         blks, err := coll.SizedDigests()
262         if err != nil {
263                 return "", err
264         }
265         todo := make(chan int, len(blks))
266         for idx := range blks {
267                 todo <- idx
268         }
269         go close(todo)
270
271         var services []arvados.KeepService
272         err = rcvr.client.EachKeepService(func(svc arvados.KeepService) error {
273                 if svc.ServiceType == "proxy" {
274                         rcvr.logger.WithField("service", svc).Debug("ignore proxy service")
275                 } else {
276                         services = append(services, svc)
277                 }
278                 return nil
279         })
280         if err != nil {
281                 return "", fmt.Errorf("error getting list of keep services: %s", err)
282         }
283         rcvr.logger.WithField("services", services).Debug("got list of services")
284
285         // blobsigexp is our deadline for saving the rescued
286         // collection. This must be less than BlobSigningTTL
287         // (otherwise our rescued blocks could be garbage collected
288         // again before we protect them by saving the collection) but
289         // the exact value is somewhat arbitrary. If it's too soon, it
290         // will arrive before we're ready to save, and save will
291         // fail. If it's too late, we'll needlessly update timestamps
292         // on some blocks that were recently written/touched (e.g., by
293         // a previous attempt to rescue this same collection) and
294         // would have lived long enough anyway if left alone.
295         // BlobSigningTTL/2 (typically around 1 week) is much longer
296         // than than we need to recover even a very large collection.
297         blobsigttl := rcvr.cluster.Collections.BlobSigningTTL.Duration()
298         blobsigexp := time.Now().Add(blobsigttl / 2)
299         rcvr.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
300
301         // We'll start a number of threads, each working on
302         // checking/recovering one block at a time. The threads
303         // themselves don't need much CPU/memory, but to avoid hitting
304         // limits on keepstore connections, backend storage bandwidth,
305         // etc., we limit concurrency to 2 per keepstore node.
306         workerThreads := 2 * len(services)
307
308         blkFound := make([]bool, len(blks))
309         var wg sync.WaitGroup
310         for i := 0; i < workerThreads; i++ {
311                 wg.Add(1)
312                 go func() {
313                         defer wg.Done()
314                 nextblk:
315                         for idx := range todo {
316                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
317                                 logger := rcvr.logger.WithField("block", blk)
318                                 for _, untrashing := range []bool{false, true} {
319                                         for _, svc := range services {
320                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
321                                                 if untrashing {
322                                                         if err := svc.Untrash(ctx, rcvr.client, blk); err != nil {
323                                                                 logger.WithError(err).Debug("untrash failed")
324                                                                 continue
325                                                         }
326                                                         logger.Info("untrashed")
327                                                 }
328                                                 err := rcvr.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
329                                                 if err == errNotFound {
330                                                         logger.Debug(err)
331                                                 } else if err != nil {
332                                                         logger.Error(err)
333                                                 } else {
334                                                         blkFound[idx] = true
335                                                         continue nextblk
336                                                 }
337                                         }
338                                 }
339                                 logger.Debug("unrecoverable")
340                         }
341                 }()
342         }
343         wg.Wait()
344
345         var have, havenot int
346         for _, ok := range blkFound {
347                 if ok {
348                         have++
349                 } else {
350                         havenot++
351                 }
352         }
353         if havenot > 0 {
354                 if have > 0 {
355                         rcvr.logger.Warn("partial recovery is not implemented")
356                 }
357                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
358         }
359
360         if rcvr.cluster.Collections.BlobSigning {
361                 key := []byte(rcvr.cluster.Collections.BlobSigningKey)
362                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, rcvr.client.AuthToken, blobsigexp, blobsigttl, key)
363         }
364         rcvr.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
365         err = rcvr.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
366                 "collection": map[string]interface{}{
367                         "manifest_text": coll.ManifestText,
368                 },
369         })
370         if err != nil {
371                 return "", fmt.Errorf("error saving new collection: %s", err)
372         }
373         rcvr.logger.WithField("UUID", coll.UUID).Debug("created new collection")
374         return coll.UUID, nil
375 }