Merge branch '14406-remote-data-in-output'
[arvados.git] / services / keepstore / pipe_adapters.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "io"
11         "io/ioutil"
12 )
13
14 // getWithPipe invokes getter and copies the resulting data into
15 // buf. If ctx is done before all data is copied, getWithPipe closes
16 // the pipe with an error, and returns early with an error.
17 func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) {
18         piper, pipew := io.Pipe()
19         go func() {
20                 pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew))
21         }()
22         done := make(chan struct{})
23         var size int
24         var err error
25         go func() {
26                 size, err = io.ReadFull(piper, buf)
27                 if err == io.EOF || err == io.ErrUnexpectedEOF {
28                         err = nil
29                 }
30                 close(done)
31         }()
32         select {
33         case <-ctx.Done():
34                 piper.CloseWithError(ctx.Err())
35                 return 0, ctx.Err()
36         case <-done:
37                 piper.Close()
38                 return size, err
39         }
40 }
41
42 // putWithPipe invokes putter with a new pipe, and and copies data
43 // from buf into the pipe. If ctx is done before all data is copied,
44 // putWithPipe closes the pipe with an error, and returns early with
45 // an error.
46 func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error {
47         piper, pipew := io.Pipe()
48         copyErr := make(chan error)
49         go func() {
50                 _, err := io.Copy(pipew, bytes.NewReader(buf))
51                 copyErr <- err
52                 close(copyErr)
53         }()
54
55         putErr := make(chan error, 1)
56         go func() {
57                 putErr <- bw.WriteBlock(ctx, loc, piper)
58                 close(putErr)
59         }()
60
61         var err error
62         select {
63         case err = <-copyErr:
64         case err = <-putErr:
65         case <-ctx.Done():
66                 err = ctx.Err()
67         }
68
69         // Ensure io.Copy goroutine isn't blocked writing to pipew
70         // (otherwise, io.Copy is still using buf so it isn't safe to
71         // return). This can cause pipew to receive corrupt data if
72         // err came from copyErr or ctx.Done() before the copy
73         // finished. That's OK, though: in that case err != nil, and
74         // CloseWithErr(err) ensures putter() will get an error from
75         // piper.Read() before seeing EOF.
76         go pipew.CloseWithError(err)
77         go io.Copy(ioutil.Discard, piper)
78         <-copyErr
79
80         // Note: io.Copy() is finished now, but putter() might still
81         // be running. If we encounter an error before putter()
82         // returns, we return right away without waiting for putter().
83
84         if err != nil {
85                 return err
86         }
87         select {
88         case <-ctx.Done():
89                 return ctx.Err()
90         case err = <-putErr:
91                 return err
92         }
93 }