Merge branch '14324-cdc-azure' refs #14324
[arvados.git] / sdk / go / asyncbuf / buf.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package asyncbuf
6
7 import (
8         "bytes"
9         "io"
10         "sync"
11 )
12
13 // A Buffer is an io.Writer that distributes written data
14 // asynchronously to multiple concurrent readers.
15 //
16 // NewReader() can be called at any time. In all cases, every returned
17 // io.Reader reads all data written to the Buffer.
18 //
19 // Behavior is undefined if Write is called after Close or
20 // CloseWithError.
21 type Buffer interface {
22         io.WriteCloser
23
24         // NewReader() returns an io.Reader that reads all data
25         // written to the Buffer.
26         NewReader() io.Reader
27
28         // Close, but return the given error (instead of io.EOF) to
29         // all readers when they reach the end of the buffer.
30         //
31         // CloseWithError(nil) is equivalent to
32         // CloseWithError(io.EOF).
33         CloseWithError(error) error
34 }
35
36 type buffer struct {
37         data *bytes.Buffer
38         cond sync.Cond
39         err  error // nil if there might be more writes
40 }
41
42 // NewBuffer creates a new Buffer using buf as its initial
43 // contents. The new Buffer takes ownership of buf, and the caller
44 // should not use buf after this call.
45 func NewBuffer(buf []byte) Buffer {
46         return &buffer{
47                 data: bytes.NewBuffer(buf),
48                 cond: sync.Cond{L: &sync.Mutex{}},
49         }
50 }
51
52 func (b *buffer) Write(p []byte) (int, error) {
53         defer b.cond.Broadcast()
54         b.cond.L.Lock()
55         defer b.cond.L.Unlock()
56         if b.err != nil {
57                 return 0, b.err
58         }
59         return b.data.Write(p)
60 }
61
62 func (b *buffer) Close() error {
63         return b.CloseWithError(nil)
64 }
65
66 func (b *buffer) CloseWithError(err error) error {
67         defer b.cond.Broadcast()
68         b.cond.L.Lock()
69         defer b.cond.L.Unlock()
70         if err == nil {
71                 b.err = io.EOF
72         } else {
73                 b.err = err
74         }
75         return nil
76 }
77
78 func (b *buffer) NewReader() io.Reader {
79         return &reader{b: b}
80 }
81
82 type reader struct {
83         b    *buffer
84         read int // # bytes already read
85 }
86
87 func (r *reader) Read(p []byte) (int, error) {
88         r.b.cond.L.Lock()
89         for {
90                 switch {
91                 case r.read < r.b.data.Len():
92                         buf := r.b.data.Bytes()
93                         r.b.cond.L.Unlock()
94                         n := copy(p, buf[r.read:])
95                         r.read += n
96                         return n, nil
97                 case r.b.err != nil || len(p) == 0:
98                         // r.b.err != nil means we reached EOF.  And
99                         // even if we're not at EOF, there's no need
100                         // to block if len(p)==0.
101                         err := r.b.err
102                         r.b.cond.L.Unlock()
103                         return 0, err
104                 default:
105                         r.b.cond.Wait()
106                 }
107         }
108 }