refactor as procedural
[arvados.git] / services / keepstore / pipe_adapters.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "io"
7         "io/ioutil"
8 )
9
10 // getWithPipe invokes getter and copies the resulting data into
11 // buf. If ctx is done before all data is copied, getWithPipe closes
12 // the pipe with an error, and returns early with an error.
13 func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) {
14         piper, pipew := io.Pipe()
15         go func() {
16                 pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew))
17         }()
18         done := make(chan struct{})
19         var size int
20         var err error
21         go func() {
22                 size, err = io.ReadFull(piper, buf)
23                 if err == io.EOF || err == io.ErrUnexpectedEOF {
24                         err = nil
25                 }
26                 close(done)
27         }()
28         select {
29         case <-ctx.Done():
30                 piper.CloseWithError(ctx.Err())
31                 return 0, ctx.Err()
32         case <-done:
33                 piper.Close()
34                 return size, err
35         }
36 }
37
38 // putWithPipe invokes putter with a new pipe, and and copies data
39 // from buf into the pipe. If ctx is done before all data is copied,
40 // putWithPipe closes the pipe with an error, and returns early with
41 // an error.
42 func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error {
43         piper, pipew := io.Pipe()
44         copyErr := make(chan error)
45         go func() {
46                 _, err := io.Copy(pipew, bytes.NewReader(buf))
47                 copyErr <- err
48                 close(copyErr)
49         }()
50
51         putErr := make(chan error, 1)
52         go func() {
53                 putErr <- bw.WriteBlock(ctx, loc, piper)
54                 close(putErr)
55         }()
56
57         var err error
58         select {
59         case err = <-copyErr:
60         case err = <-putErr:
61         case <-ctx.Done():
62                 err = ctx.Err()
63         }
64
65         // Ensure io.Copy goroutine isn't blocked writing to pipew
66         // (otherwise, io.Copy is still using buf so it isn't safe to
67         // return). This can cause pipew to receive corrupt data if
68         // err came from copyErr or ctx.Done() before the copy
69         // finished. That's OK, though: in that case err != nil, and
70         // CloseWithErr(err) ensures putter() will get an error from
71         // piper.Read() before seeing EOF.
72         go pipew.CloseWithError(err)
73         go io.Copy(ioutil.Discard, piper)
74         <-copyErr
75
76         // Note: io.Copy() is finished now, but putter() might still
77         // be running. If we encounter an error before putter()
78         // returns, we return right away without waiting for putter().
79
80         if err != nil {
81                 return err
82         }
83         select {
84         case <-ctx.Done():
85                 return ctx.Err()
86         case err = <-putErr:
87                 return err
88         }
89 }