Merge branch '18947-health'
[arvados.git] / services / keepstore / collision.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13 )
14
15 // Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
16 // all bytes readable from rdr). If all data is read successfully,
17 // return DiskHashError or CollisionError depending on whether it
18 // matches expectMD5. If an error occurs while reading, return that
19 // error.
20 //
21 // "content has expected MD5" is called a collision because this
22 // function is used in cases where we have another block in hand with
23 // the given MD5 but different content.
24 func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
25         outcome := make(chan error)
26         data := make(chan []byte, 1)
27         go func() {
28                 h := md5.New()
29                 for b := range data {
30                         h.Write(b)
31                 }
32                 if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
33                         outcome <- CollisionError
34                 } else {
35                         outcome <- DiskHashError
36                 }
37         }()
38         data <- buf1
39         if buf2 != nil {
40                 data <- buf2
41         }
42         var err error
43         for rdr != nil && err == nil {
44                 buf := make([]byte, 1<<18)
45                 var n int
46                 n, err = rdr.Read(buf)
47                 data <- buf[:n]
48         }
49         close(data)
50         if rdr != nil && err != io.EOF {
51                 <-outcome
52                 return err
53         }
54         return <-outcome
55 }
56
57 func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
58         bufLen := 1 << 20
59         if bufLen > len(expect) && len(expect) > 0 {
60                 // No need for bufLen to be longer than
61                 // expect, except that len(buf)==0 would
62                 // prevent us from handling empty readers the
63                 // same way as non-empty readers: reading 0
64                 // bytes at a time never reaches EOF.
65                 bufLen = len(expect)
66         }
67         buf := make([]byte, bufLen)
68         cmp := expect
69
70         // Loop invariants: all data read so far matched what
71         // we expected, and the first N bytes of cmp are
72         // expected to equal the next N bytes read from
73         // rdr.
74         for {
75                 ready := make(chan bool)
76                 var n int
77                 var err error
78                 go func() {
79                         n, err = rdr.Read(buf)
80                         close(ready)
81                 }()
82                 select {
83                 case <-ready:
84                 case <-ctx.Done():
85                         return ctx.Err()
86                 }
87                 if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
88                         return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
89                 }
90                 cmp = cmp[n:]
91                 if err == io.EOF {
92                         if len(cmp) != 0 {
93                                 return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
94                         }
95                         return nil
96                 } else if err != nil {
97                         return err
98                 }
99         }
100 }