1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 package recovercollection
18 "git.arvados.org/arvados.git/lib/cmd"
19 "git.arvados.org/arvados.git/lib/config"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
22 "github.com/sirupsen/logrus"
29 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
31 logger := ctxlog.New(stderr, "text", "info")
34 logger.WithError(err).Error("fatal")
36 logger.Info("exiting")
39 loader := config.NewLoader(stdin, logger)
40 loader.SkipLegacy = true
42 flags := flag.NewFlagSet(prog, flag.ContinueOnError)
43 flags.Usage = func() {
44 fmt.Fprintf(flags.Output(), `Usage:
45 %s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]
47 This program recovers deleted collections. Recovery is
48 possible when the collection's manifest is still available and
49 all of its data blocks are still available or recoverable
50 (e.g., garbage collection is not enabled, the blocks are too
51 new for garbage collection, the blocks are referenced by other
52 collections, or the blocks have been trashed but not yet
55 There are multiple ways to specify a collection to recover:
57 * Path to a local file containing a manifest with the desired
60 * UUID of an Arvados log entry, typically a "delete" or
61 "update" event, whose "old attributes" have a manifest with
64 * UUID of an Arvados collection whose most recent log entry,
65 typically a "delete" or "update" event, has the desired
66 data in its "old attributes"
68 For each provided collection manifest, once all data blocks
69 are recovered/protected from garbage collection, a new
70 collection is saved and its UUID is printed on stdout.
72 Restored collections will belong to the system (root) user.
74 Exit status will be zero if recovery is successful, i.e., a
75 collection is saved for each provided manifest.
80 loader.SetupFlags(flags)
81 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
82 if ok, code := cmd.ParseFlags(flags, prog, args, "source [...]", stderr); !ok {
84 } else if flags.NArg() == 0 {
85 fmt.Fprintf(stderr, "missing required arguments (try -help)\n")
89 lvl, err := logrus.ParseLevel(*loglevel)
95 cfg, err := loader.Load()
99 cluster, err := cfg.GetCluster("")
103 client, err := arvados.NewClientFromConfig(cluster)
107 client.AuthToken = cluster.SystemRootToken
115 for _, src := range flags.Args() {
116 logger := logger.WithField("src", src)
118 if !strings.Contains(src, "/") && len(src) == 27 && src[5] == '-' && src[11] == '-' {
119 var filters []arvados.Filter
120 if src[5:12] == "-57u5n-" {
121 filters = []arvados.Filter{{"uuid", "=", src}}
122 } else if src[5:12] == "-4zz18-" {
123 filters = []arvados.Filter{{"object_uuid", "=", src}}
125 logger.Error("looks like a UUID but not a log or collection UUID (if it's really a file, prepend './')")
131 UUID string `json:"uuid"`
132 EventType string `json:"event_type"`
133 EventAt time.Time `json:"event_at"`
134 ObjectUUID string `json:"object_uuid"`
136 OldAttributes struct {
137 ManifestText string `json:"manifest_text"`
138 } `json:"old_attributes"`
139 } `json:"properties"`
142 err = client.RequestAndDecode(&resp, "GET", "arvados/v1/logs", nil, arvados.ListOptions{
144 Order: []string{"event_at desc"},
148 logger.WithError(err).Error("error looking up log entry")
151 } else if len(resp.Items) == 0 {
152 logger.Error("log entry not found")
156 logent := resp.Items[0]
157 logger.WithFields(logrus.Fields{
159 "old_collection_uuid": logent.ObjectUUID,
160 "logged_event_type": logent.EventType,
161 "logged_event_time": logent.EventAt,
162 "logged_object_uuid": logent.ObjectUUID,
163 }).Info("loaded log entry")
164 mtxt = logent.Properties.OldAttributes.ManifestText
166 logger.Error("log entry properties.old_attributes.manifest_text missing or empty")
171 buf, err := ioutil.ReadFile(src)
173 logger.WithError(err).Error("failed to load manifest data from file")
179 uuid, err := rcvr.RecoverManifest(string(mtxt))
181 logger.WithError(err).Error("recovery failed")
185 logger.WithField("UUID", uuid).Info("recovery succeeded")
186 fmt.Fprintln(stdout, uuid)
191 type recoverer struct {
192 client *arvados.Client
193 cluster *arvados.Cluster
194 logger logrus.FieldLogger
197 var errNotFound = errors.New("not found")
199 // Finds the timestamp of the newest copy of blk on svc. Returns
200 // errNotFound if blk is not on svc at all.
201 func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
202 found, err := svc.Index(ctx, rcvr.client, blk)
204 logger.WithError(err).Warn("error getting index")
205 return time.Time{}, err
206 } else if len(found) == 0 {
207 return time.Time{}, errNotFound
210 for _, ent := range found {
211 t := time.Unix(0, ent.Mtime)
216 logger.WithField("latest", latest).Debug("found")
220 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
222 // Ensures the given block exists on the given server and won't be
223 // eligible for trashing until after our chosen deadline (blobsigexp).
224 // Returns an error if the block doesn't exist on the given server, or
225 // has an old timestamp and can't be updated.
227 // After we decide a block is "safe" (whether or not we had to untrash
228 // it), keep-balance might notice that it's currently unreferenced and
229 // decide to trash it, all before our recovered collection gets
230 // saved. But if the block's timestamp is more recent than blobsigttl,
231 // keepstore will refuse to trash it even if told to by keep-balance.
232 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
233 if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
235 } else if latest.Add(blobsigttl).After(blobsigexp) {
238 if err := svc.Touch(ctx, rcvr.client, blk); err != nil {
239 return fmt.Errorf("error updating timestamp: %s", err)
241 logger.Debug("updated timestamp")
242 if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
243 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
244 } else if err != nil {
246 } else if latest.Add(blobsigttl).After(blobsigexp) {
249 return errTouchIneffective
253 // Untrash and update GC timestamps (as needed) on blocks referenced
254 // by the given manifest, save a new collection and return the new
255 // collection's UUID.
256 func (rcvr recoverer) RecoverManifest(mtxt string) (string, error) {
257 ctx, cancel := context.WithCancel(context.Background())
260 coll := arvados.Collection{ManifestText: mtxt}
261 blks, err := coll.SizedDigests()
265 todo := make(chan int, len(blks))
266 for idx := range blks {
271 var services []arvados.KeepService
272 err = rcvr.client.EachKeepService(func(svc arvados.KeepService) error {
273 if svc.ServiceType == "proxy" {
274 rcvr.logger.WithField("service", svc).Debug("ignore proxy service")
276 services = append(services, svc)
281 return "", fmt.Errorf("error getting list of keep services: %s", err)
283 rcvr.logger.WithField("services", services).Debug("got list of services")
285 // blobsigexp is our deadline for saving the rescued
286 // collection. This must be less than BlobSigningTTL
287 // (otherwise our rescued blocks could be garbage collected
288 // again before we protect them by saving the collection) but
289 // the exact value is somewhat arbitrary. If it's too soon, it
290 // will arrive before we're ready to save, and save will
291 // fail. If it's too late, we'll needlessly update timestamps
292 // on some blocks that were recently written/touched (e.g., by
293 // a previous attempt to rescue this same collection) and
294 // would have lived long enough anyway if left alone.
295 // BlobSigningTTL/2 (typically around 1 week) is much longer
296 // than than we need to recover even a very large collection.
297 blobsigttl := rcvr.cluster.Collections.BlobSigningTTL.Duration()
298 blobsigexp := time.Now().Add(blobsigttl / 2)
299 rcvr.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
301 // We'll start a number of threads, each working on
302 // checking/recovering one block at a time. The threads
303 // themselves don't need much CPU/memory, but to avoid hitting
304 // limits on keepstore connections, backend storage bandwidth,
305 // etc., we limit concurrency to 2 per keepstore node.
306 workerThreads := 2 * len(services)
308 blkFound := make([]bool, len(blks))
309 var wg sync.WaitGroup
310 for i := 0; i < workerThreads; i++ {
315 for idx := range todo {
316 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
317 logger := rcvr.logger.WithField("block", blk)
318 for _, untrashing := range []bool{false, true} {
319 for _, svc := range services {
320 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
322 if err := svc.Untrash(ctx, rcvr.client, blk); err != nil {
323 logger.WithError(err).Debug("untrash failed")
326 logger.Info("untrashed")
328 err := rcvr.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
329 if err == errNotFound {
331 } else if err != nil {
339 logger.Debug("unrecoverable")
345 var have, havenot int
346 for _, ok := range blkFound {
355 rcvr.logger.Warn("partial recovery is not implemented")
357 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
360 if rcvr.cluster.Collections.BlobSigning {
361 key := []byte(rcvr.cluster.Collections.BlobSigningKey)
362 coll.ManifestText = arvados.SignManifest(coll.ManifestText, rcvr.client.AuthToken, blobsigexp, blobsigttl, key)
364 rcvr.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
365 err = rcvr.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
366 "collection": map[string]interface{}{
367 "manifest_text": coll.ManifestText,
371 return "", fmt.Errorf("error saving new collection: %s", err)
373 rcvr.logger.WithField("UUID", coll.UUID).Debug("created new collection")
374 return coll.UUID, nil