Merge branch 'master' into 8876-work-unit
[arvados.git] / services / keepstore / collision.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "io"
8 )
9
10 // Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
11 // all bytes readable from rdr). If all data is read successfully,
12 // return DiskHashError or CollisionError depending on whether it
13 // matches expectMD5. If an error occurs while reading, return that
14 // error.
15 //
16 // "content has expected MD5" is called a collision because this
17 // function is used in cases where we have another block in hand with
18 // the given MD5 but different content.
19 func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
20         outcome := make(chan error)
21         data := make(chan []byte, 1)
22         go func() {
23                 h := md5.New()
24                 for b := range data {
25                         h.Write(b)
26                 }
27                 if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
28                         outcome <- CollisionError
29                 } else {
30                         outcome <- DiskHashError
31                 }
32         }()
33         data <- buf1
34         if buf2 != nil {
35                 data <- buf2
36         }
37         var err error
38         for rdr != nil && err == nil {
39                 buf := make([]byte, 1<<18)
40                 var n int
41                 n, err = rdr.Read(buf)
42                 data <- buf[:n]
43         }
44         close(data)
45         if rdr != nil && err != io.EOF {
46                 <-outcome
47                 return err
48         }
49         return <-outcome
50 }
51
52 func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
53         bufLen := 1 << 20
54         if bufLen > len(expect) && len(expect) > 0 {
55                 // No need for bufLen to be longer than
56                 // expect, except that len(buf)==0 would
57                 // prevent us from handling empty readers the
58                 // same way as non-empty readers: reading 0
59                 // bytes at a time never reaches EOF.
60                 bufLen = len(expect)
61         }
62         buf := make([]byte, bufLen)
63         cmp := expect
64
65         // Loop invariants: all data read so far matched what
66         // we expected, and the first N bytes of cmp are
67         // expected to equal the next N bytes read from
68         // rdr.
69         for {
70                 n, err := rdr.Read(buf)
71                 if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
72                         return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
73                 }
74                 cmp = cmp[n:]
75                 if err == io.EOF {
76                         if len(cmp) != 0 {
77                                 return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
78                         }
79                         return nil
80                 } else if err != nil {
81                         return err
82                 }
83         }
84 }