1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 package recovercollection
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/sirupsen/logrus"
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30 logger := ctxlog.New(stderr, "text", "info")
33 logger.WithError(err).Error("fatal")
35 logger.Info("exiting")
38 loader := config.NewLoader(stdin, logger)
39 loader.SkipLegacy = true
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 flags.Usage = func() {
44 fmt.Fprintf(flags.Output(), `Usage:
45 %s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]
47 This program recovers deleted collections. Recovery is
48 possible when the collection's manifest is still available and
49 all of its data blocks are still available or recoverable
50 (e.g., garbage collection is not enabled, the blocks are too
51 new for garbage collection, the blocks are referenced by other
52 collections, or the blocks have been trashed but not yet
55 There are multiple ways to specify a collection to recover:
57 * Path to a local file containing a manifest with the desired
60 * UUID of an Arvados log entry, typically a "delete" or
61 "update" event, whose "old attributes" have a manifest with
64 * UUID of an Arvados collection whose most recent log entry,
65 typically a "delete" or "update" event, has the desired
66 data in its "old attributes"
68 For each provided collection manifest, once all data blocks
69 are recovered/protected from garbage collection, a new
70 collection is saved and its UUID is printed on stdout.
72 Restored collections will belong to the system (root) user.
74 Exit status will be zero if recovery is successful, i.e., a
75 collection is saved for each provided manifest.
80 loader.SetupFlags(flags)
81 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
82 err = flags.Parse(args)
83 if err == flag.ErrHelp {
86 } else if err != nil {
90 if len(flags.Args()) == 0 {
95 lvl, err := logrus.ParseLevel(*loglevel)
101 cfg, err := loader.Load()
105 cluster, err := cfg.GetCluster("")
109 client, err := arvados.NewClientFromConfig(cluster)
113 client.AuthToken = cluster.SystemRootToken
121 for _, src := range flags.Args() {
122 logger := logger.WithField("src", src)
124 if !strings.Contains(src, "/") && len(src) == 27 && src[5] == '-' && src[11] == '-' {
125 var filters []arvados.Filter
126 if src[5:12] == "-57u5n-" {
127 filters = []arvados.Filter{{"uuid", "=", src}}
128 } else if src[5:12] == "-4zz18-" {
129 filters = []arvados.Filter{{"object_uuid", "=", src}}
131 logger.Error("looks like a UUID but not a log or collection UUID (if it's really a file, prepend './')")
137 UUID string `json:"uuid"`
138 EventType string `json:"event_type"`
139 EventAt time.Time `json:"event_at"`
140 ObjectUUID string `json:"object_uuid"`
142 OldAttributes struct {
143 ManifestText string `json:"manifest_text"`
144 } `json:"old_attributes"`
145 } `json:"properties"`
148 err = client.RequestAndDecode(&resp, "GET", "arvados/v1/logs", nil, arvados.ListOptions{
150 Order: []string{"event_at desc"},
154 logger.WithError(err).Error("error looking up log entry")
157 } else if len(resp.Items) == 0 {
158 logger.Error("log entry not found")
162 logent := resp.Items[0]
163 logger.WithFields(logrus.Fields{
165 "old_collection_uuid": logent.ObjectUUID,
166 "logged_event_type": logent.EventType,
167 "logged_event_time": logent.EventAt,
168 "logged_object_uuid": logent.ObjectUUID,
169 }).Info("loaded log entry")
170 mtxt = logent.Properties.OldAttributes.ManifestText
172 logger.Error("log entry properties.old_attributes.manifest_text missing or empty")
177 buf, err := ioutil.ReadFile(src)
179 logger.WithError(err).Error("failed to load manifest data from file")
185 uuid, err := rcvr.RecoverManifest(string(mtxt))
187 logger.WithError(err).Error("recovery failed")
191 logger.WithField("UUID", uuid).Info("recovery succeeded")
192 fmt.Fprintln(stdout, uuid)
197 type recoverer struct {
198 client *arvados.Client
199 cluster *arvados.Cluster
200 logger logrus.FieldLogger
203 var errNotFound = errors.New("not found")
205 // Finds the timestamp of the newest copy of blk on svc. Returns
206 // errNotFound if blk is not on svc at all.
207 func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
208 found, err := svc.Index(ctx, rcvr.client, blk)
210 logger.WithError(err).Warn("error getting index")
211 return time.Time{}, err
212 } else if len(found) == 0 {
213 return time.Time{}, errNotFound
216 for _, ent := range found {
217 t := time.Unix(0, ent.Mtime)
222 logger.WithField("latest", latest).Debug("found")
226 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
228 // Ensures the given block exists on the given server and won't be
229 // eligible for trashing until after our chosen deadline (blobsigexp).
230 // Returns an error if the block doesn't exist on the given server, or
231 // has an old timestamp and can't be updated.
233 // After we decide a block is "safe" (whether or not we had to untrash
234 // it), keep-balance might notice that it's currently unreferenced and
235 // decide to trash it, all before our recovered collection gets
236 // saved. But if the block's timestamp is more recent than blobsigttl,
237 // keepstore will refuse to trash it even if told to by keep-balance.
238 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
239 if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
241 } else if latest.Add(blobsigttl).After(blobsigexp) {
244 if err := svc.Touch(ctx, rcvr.client, blk); err != nil {
245 return fmt.Errorf("error updating timestamp: %s", err)
247 logger.Debug("updated timestamp")
248 if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
249 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
250 } else if err != nil {
252 } else if latest.Add(blobsigttl).After(blobsigexp) {
255 return errTouchIneffective
259 // Untrash and update GC timestamps (as needed) on blocks referenced
260 // by the given manifest, save a new collection and return the new
261 // collection's UUID.
262 func (rcvr recoverer) RecoverManifest(mtxt string) (string, error) {
263 ctx, cancel := context.WithCancel(context.Background())
266 coll := arvados.Collection{ManifestText: mtxt}
267 blks, err := coll.SizedDigests()
271 todo := make(chan int, len(blks))
272 for idx := range blks {
277 var services []arvados.KeepService
278 err = rcvr.client.EachKeepService(func(svc arvados.KeepService) error {
279 if svc.ServiceType == "proxy" {
280 rcvr.logger.WithField("service", svc).Debug("ignore proxy service")
282 services = append(services, svc)
287 return "", fmt.Errorf("error getting list of keep services: %s", err)
289 rcvr.logger.WithField("services", services).Debug("got list of services")
291 // blobsigexp is our deadline for saving the rescued
292 // collection. This must be less than BlobSigningTTL
293 // (otherwise our rescued blocks could be garbage collected
294 // again before we protect them by saving the collection) but
295 // the exact value is somewhat arbitrary. If it's too soon, it
296 // will arrive before we're ready to save, and save will
297 // fail. If it's too late, we'll needlessly update timestamps
298 // on some blocks that were recently written/touched (e.g., by
299 // a previous attempt to rescue this same collection) and
300 // would have lived long enough anyway if left alone.
301 // BlobSigningTTL/2 (typically around 1 week) is much longer
302 // than than we need to recover even a very large collection.
303 blobsigttl := rcvr.cluster.Collections.BlobSigningTTL.Duration()
304 blobsigexp := time.Now().Add(blobsigttl / 2)
305 rcvr.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
307 // We'll start a number of threads, each working on
308 // checking/recovering one block at a time. The threads
309 // themselves don't need much CPU/memory, but to avoid hitting
310 // limits on keepstore connections, backend storage bandwidth,
311 // etc., we limit concurrency to 2 per keepstore node.
312 workerThreads := 2 * len(services)
314 blkFound := make([]bool, len(blks))
315 var wg sync.WaitGroup
316 for i := 0; i < workerThreads; i++ {
321 for idx := range todo {
322 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
323 logger := rcvr.logger.WithField("block", blk)
324 for _, untrashing := range []bool{false, true} {
325 for _, svc := range services {
326 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
328 if err := svc.Untrash(ctx, rcvr.client, blk); err != nil {
329 logger.WithError(err).Debug("untrash failed")
332 logger.Info("untrashed")
334 err := rcvr.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
335 if err == errNotFound {
337 } else if err != nil {
345 logger.Debug("unrecoverable")
351 var have, havenot int
352 for _, ok := range blkFound {
361 rcvr.logger.Warn("partial recovery is not implemented")
363 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
366 if rcvr.cluster.Collections.BlobSigning {
367 key := []byte(rcvr.cluster.Collections.BlobSigningKey)
368 coll.ManifestText = arvados.SignManifest(coll.ManifestText, rcvr.client.AuthToken, blobsigexp, blobsigttl, key)
370 rcvr.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
371 err = rcvr.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
372 "collection": map[string]interface{}{
373 "manifest_text": coll.ManifestText,
377 return "", fmt.Errorf("error saving new collection: %s", err)
379 rcvr.logger.WithField("UUID", coll.UUID).Debug("created new collection")
380 return coll.UUID, nil