1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/sirupsen/logrus"
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30 logger := ctxlog.New(stderr, "text", "info")
33 logger.WithError(err).Error("fatal")
35 logger.Info("exiting")
38 loader := config.NewLoader(stdin, logger)
40 flags := flag.NewFlagSet("", flag.ContinueOnError)
41 flags.SetOutput(stderr)
42 loader.SetupFlags(flags)
43 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
44 err = flags.Parse(args)
45 if err == flag.ErrHelp {
48 } else if err != nil {
52 if len(flags.Args()) == 0 {
53 fmt.Fprintf(stderr, "Usage: %s [options] uuid_or_file ...\n", prog)
58 lvl, err := logrus.ParseLevel(*loglevel)
64 cfg, err := loader.Load()
68 cluster, err := cfg.GetCluster("")
72 client, err := arvados.NewClientFromConfig(cluster)
76 client.AuthToken = cluster.SystemRootToken
84 for _, src := range flags.Args() {
85 logger := logger.WithField("src", src)
86 if len(src) == 27 && src[5:12] == "-57u5n-" {
87 logger.Error("log entry lookup not implemented")
91 mtxt, err := ioutil.ReadFile(src)
93 logger.WithError(err).Error("error loading manifest data")
97 uuid, err := und.RecoverManifest(string(mtxt))
99 logger.WithError(err).Error("recovery failed")
103 logger.WithField("UUID", uuid).Info("recovery succeeded")
104 fmt.Fprintln(stdout, uuid)
110 type undeleter struct {
111 client *arvados.Client
112 cluster *arvados.Cluster
113 logger logrus.FieldLogger
116 var errNotFound = errors.New("not found")
118 // Return the timestamp of the newest copy of blk on svc. Second
119 // return value is false if blk is not on svc at all, or an error
121 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
122 found, err := svc.Index(und.client, blk)
124 logger.WithError(err).Warn("error getting index")
125 return time.Time{}, err
126 } else if len(found) == 0 {
127 return time.Time{}, errNotFound
130 for _, ent := range found {
131 t := time.Unix(0, ent.Mtime)
136 logger.WithField("latest", latest).Debug("found")
140 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
142 // Ensure the given block exists on the given server and won't be
143 // eligible for trashing until after our chosen deadline (blobsigexp).
144 // Returns an error if the block doesn't exist on the given server, or
145 // has an old timestamp and can't be updated. Reports errors via
148 // After we decide a block is "safe" (whether or not we had to untrash
149 // it), keep-balance might notice that it's currently unreferenced and
150 // decide to trash it, all before our recovered collection gets
151 // saved. But if the block's timestamp is more recent than blobsigttl,
152 // keepstore will refuse to trash it even if told to by keep-balance.
153 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
154 if latest, err := und.newestMtime(logger, blk, svc); err != nil {
156 } else if latest.Add(blobsigttl).After(blobsigexp) {
159 if err := svc.Touch(ctx, und.client, blk); err != nil {
160 return fmt.Errorf("error updating timestamp: %s", err)
162 logger.Debug("updated timestamp")
163 if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
164 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
165 } else if err != nil {
167 } else if latest.Add(blobsigttl).After(blobsigexp) {
170 return errTouchIneffective
174 // Untrash and update GC timestamps (as needed) on blocks referenced
175 // by the given manifest, save a new collection and return the new
176 // collection's UUID.
177 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
178 ctx, cancel := context.WithCancel(context.Background())
181 coll := arvados.Collection{ManifestText: mtxt}
182 blks, err := coll.SizedDigests()
186 todo := make(chan int, len(blks))
187 for idx := range blks {
192 var services []arvados.KeepService
193 err = und.client.EachKeepService(func(svc arvados.KeepService) error {
194 if svc.ServiceType == "proxy" {
195 und.logger.WithField("service", svc).Debug("ignore proxy service")
197 services = append(services, svc)
202 return "", fmt.Errorf("error getting list of keep services: %s", err)
204 und.logger.WithField("services", services).Debug("got list of services")
206 // Choose a deadline for saving a rescued collection.
207 blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
208 blobsigexp := time.Now().Add(blobsigttl / 2)
209 und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
211 blkFound := make([]bool, len(blks))
212 var wg sync.WaitGroup
213 for i := 0; i < 2*len(services); i++ {
218 for idx := range todo {
219 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
220 logger := und.logger.WithField("block", blk)
221 for _, untrashing := range []bool{false, true} {
222 for _, svc := range services {
223 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
225 if err := svc.Untrash(ctx, und.client, blk); err != nil {
226 logger.WithError(err).Debug("untrash failed")
229 logger.Info("untrashed")
231 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
232 if err == errNotFound {
234 } else if err != nil {
242 logger.Debug("unrecoverable")
248 var have, havenot int
249 for _, ok := range blkFound {
258 und.logger.Warn("partial recovery is not implemented")
260 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
263 if und.cluster.Collections.BlobSigning {
264 key := []byte(und.cluster.Collections.BlobSigningKey)
265 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
267 und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
268 err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
269 "collection": map[string]interface{}{
270 "manifest_text": coll.ManifestText,
274 return "", fmt.Errorf("error saving new collection: %s", err)
276 und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
277 return coll.UUID, nil