16427: Return error instead of ok bool from util funcs.
[arvados.git] / lib / undelete / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package undelete
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/sirupsen/logrus"
22 )
23
24 var Command command
25
26 type command struct{}
27
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         logger := ctxlog.New(stderr, "text", "info")
31         defer func() {
32                 if err != nil {
33                         logger.WithError(err).Error("fatal")
34                 }
35                 logger.Info("exiting")
36         }()
37
38         loader := config.NewLoader(stdin, logger)
39
40         flags := flag.NewFlagSet("", flag.ContinueOnError)
41         flags.SetOutput(stderr)
42         loader.SetupFlags(flags)
43         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         }
51
52         if len(flags.Args()) == 0 {
53                 fmt.Fprintf(stderr, "Usage: %s [options] uuid_or_file ...\n", prog)
54                 flags.PrintDefaults()
55                 return 2
56         }
57
58         lvl, err := logrus.ParseLevel(*loglevel)
59         if err != nil {
60                 return 2
61         }
62         logger.SetLevel(lvl)
63
64         cfg, err := loader.Load()
65         if err != nil {
66                 return 1
67         }
68         cluster, err := cfg.GetCluster("")
69         if err != nil {
70                 return 1
71         }
72         client, err := arvados.NewClientFromConfig(cluster)
73         if err != nil {
74                 return 1
75         }
76         client.AuthToken = cluster.SystemRootToken
77         und := undeleter{
78                 client:  client,
79                 cluster: cluster,
80                 logger:  logger,
81         }
82
83         exitcode := 0
84         for _, src := range flags.Args() {
85                 logger := logger.WithField("src", src)
86                 if len(src) == 27 && src[5:12] == "-57u5n-" {
87                         logger.Error("log entry lookup not implemented")
88                         exitcode = 1
89                         continue
90                 } else {
91                         mtxt, err := ioutil.ReadFile(src)
92                         if err != nil {
93                                 logger.WithError(err).Error("error loading manifest data")
94                                 exitcode = 1
95                                 continue
96                         }
97                         uuid, err := und.RecoverManifest(string(mtxt))
98                         if err != nil {
99                                 logger.WithError(err).Error("recovery failed")
100                                 exitcode = 1
101                                 continue
102                         }
103                         logger.WithField("UUID", uuid).Info("recovery succeeded")
104                         fmt.Fprintln(stdout, uuid)
105                 }
106         }
107         return exitcode
108 }
109
110 type undeleter struct {
111         client  *arvados.Client
112         cluster *arvados.Cluster
113         logger  logrus.FieldLogger
114 }
115
116 var errNotFound = errors.New("not found")
117
118 // Return the timestamp of the newest copy of blk on svc. Second
119 // return value is false if blk is not on svc at all, or an error
120 // occurs.
121 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
122         found, err := svc.Index(und.client, blk)
123         if err != nil {
124                 logger.WithError(err).Warn("error getting index")
125                 return time.Time{}, err
126         } else if len(found) == 0 {
127                 return time.Time{}, errNotFound
128         }
129         var latest time.Time
130         for _, ent := range found {
131                 t := time.Unix(0, ent.Mtime)
132                 if t.After(latest) {
133                         latest = t
134                 }
135         }
136         logger.WithField("latest", latest).Debug("found")
137         return latest, nil
138 }
139
140 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
141
142 // Ensure the given block exists on the given server and won't be
143 // eligible for trashing until after our chosen deadline (blobsigexp).
144 // Returns an error if the block doesn't exist on the given server, or
145 // has an old timestamp and can't be updated.  Reports errors via
146 // logger.
147 //
148 // After we decide a block is "safe" (whether or not we had to untrash
149 // it), keep-balance might notice that it's currently unreferenced and
150 // decide to trash it, all before our recovered collection gets
151 // saved. But if the block's timestamp is more recent than blobsigttl,
152 // keepstore will refuse to trash it even if told to by keep-balance.
153 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
154         if latest, err := und.newestMtime(logger, blk, svc); err != nil {
155                 return err
156         } else if latest.Add(blobsigttl).After(blobsigexp) {
157                 return nil
158         }
159         if err := svc.Touch(ctx, und.client, blk); err != nil {
160                 return fmt.Errorf("error updating timestamp: %s", err)
161         }
162         logger.Debug("updated timestamp")
163         if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
164                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
165         } else if err != nil {
166                 return err
167         } else if latest.Add(blobsigttl).After(blobsigexp) {
168                 return nil
169         } else {
170                 return errTouchIneffective
171         }
172 }
173
174 // Untrash and update GC timestamps (as needed) on blocks referenced
175 // by the given manifest, save a new collection and return the new
176 // collection's UUID.
177 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
178         ctx, cancel := context.WithCancel(context.Background())
179         defer cancel()
180
181         coll := arvados.Collection{ManifestText: mtxt}
182         blks, err := coll.SizedDigests()
183         if err != nil {
184                 return "", err
185         }
186         todo := make(chan int, len(blks))
187         for idx := range blks {
188                 todo <- idx
189         }
190         go close(todo)
191
192         var services []arvados.KeepService
193         err = und.client.EachKeepService(func(svc arvados.KeepService) error {
194                 if svc.ServiceType == "proxy" {
195                         und.logger.WithField("service", svc).Debug("ignore proxy service")
196                 } else {
197                         services = append(services, svc)
198                 }
199                 return nil
200         })
201         if err != nil {
202                 return "", fmt.Errorf("error getting list of keep services: %s", err)
203         }
204         und.logger.WithField("services", services).Debug("got list of services")
205
206         // Choose a deadline for saving a rescued collection.
207         blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
208         blobsigexp := time.Now().Add(blobsigttl / 2)
209         und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
210
211         blkFound := make([]bool, len(blks))
212         var wg sync.WaitGroup
213         for i := 0; i < 2*len(services); i++ {
214                 wg.Add(1)
215                 go func() {
216                         defer wg.Done()
217                 nextblk:
218                         for idx := range todo {
219                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
220                                 logger := und.logger.WithField("block", blk)
221                                 for _, untrashing := range []bool{false, true} {
222                                         for _, svc := range services {
223                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
224                                                 if untrashing {
225                                                         if err := svc.Untrash(ctx, und.client, blk); err != nil {
226                                                                 logger.WithError(err).Debug("untrash failed")
227                                                                 continue
228                                                         }
229                                                         logger.Info("untrashed")
230                                                 }
231                                                 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
232                                                 if err == errNotFound {
233                                                         logger.Debug(err)
234                                                 } else if err != nil {
235                                                         logger.Error(err)
236                                                 } else {
237                                                         blkFound[idx] = true
238                                                         continue nextblk
239                                                 }
240                                         }
241                                 }
242                                 logger.Debug("unrecoverable")
243                         }
244                 }()
245         }
246         wg.Wait()
247
248         var have, havenot int
249         for _, ok := range blkFound {
250                 if ok {
251                         have++
252                 } else {
253                         havenot++
254                 }
255         }
256         if havenot > 0 {
257                 if have > 0 {
258                         und.logger.Warn("partial recovery is not implemented")
259                 }
260                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
261         }
262
263         if und.cluster.Collections.BlobSigning {
264                 key := []byte(und.cluster.Collections.BlobSigningKey)
265                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
266         }
267         und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
268         err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
269                 "collection": map[string]interface{}{
270                         "manifest_text": coll.ManifestText,
271                 },
272         })
273         if err != nil {
274                 return "", fmt.Errorf("error saving new collection: %s", err)
275         }
276         und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
277         return coll.UUID, nil
278 }