X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3400559165e50e3d62adf6d45f9970a13450d907..eae48c31bb338689ec67fbc6a14a2e0b1fb5e3b6:/services/keepstore/collision.go diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go index 210286ad75..82cb789eb9 100644 --- a/services/keepstore/collision.go +++ b/services/keepstore/collision.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "context" "crypto/md5" "fmt" "io" @@ -35,7 +37,7 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro } var err error for rdr != nil && err == nil { - buf := make([]byte, 1 << 18) + buf := make([]byte, 1<<18) var n int n, err = rdr.Read(buf) data <- buf[:n] @@ -47,3 +49,48 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro } return <-outcome } + +func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error { + bufLen := 1 << 20 + if bufLen > len(expect) && len(expect) > 0 { + // No need for bufLen to be longer than + // expect, except that len(buf)==0 would + // prevent us from handling empty readers the + // same way as non-empty readers: reading 0 + // bytes at a time never reaches EOF. + bufLen = len(expect) + } + buf := make([]byte, bufLen) + cmp := expect + + // Loop invariants: all data read so far matched what + // we expected, and the first N bytes of cmp are + // expected to equal the next N bytes read from + // rdr. + for { + ready := make(chan bool) + var n int + var err error + go func() { + n, err = rdr.Read(buf) + close(ready) + }() + select { + case <-ready: + case <-ctx.Done(): + return ctx.Err() + } + if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 { + return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr) + } + cmp = cmp[n:] + if err == io.EOF { + if len(cmp) != 0 { + return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil) + } + return nil + } else if err != nil { + return err + } + } +}