7582: Uploader mostly done, writing tests
[arvados.git] / sdk / go / crunchrunner / upload.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "git.curoverse.com/arvados.git/sdk/go/manifest"
9         "io"
10         "log"
11         "os"
12         "path/filepath"
13 )
14
15 type Block struct {
16         data   []byte
17         offset int64
18 }
19
20 type ManifestStreamWriter struct {
21         *ManifestWriter
22         *manifest.ManifestStream
23         offset int64
24         *Block
25         uploader chan *Block
26 }
27
28 type IKeepClient interface {
29         PutHB(hash string, buf []byte) (string, int, error)
30 }
31
32 func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
33         // Needed to conform to Writer interface, but not implemented
34         // because io.Copy will actually use ReadFrom instead.
35         return 0, nil
36 }
37
38 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
39         var total int64
40         var count int
41
42         for err == nil {
43                 if m.Block == nil {
44                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
45                 }
46                 count, err = r.Read(m.Block.data[m.Block.offset:])
47                 total += int64(count)
48                 m.Block.offset += int64(count)
49                 if count > 0 {
50                         if m.Block.offset == keepclient.BLOCKSIZE {
51                                 m.uploader <- m.Block
52                                 m.Block = nil
53                         }
54                 }
55         }
56
57         return total, err
58 }
59
60 func (m *ManifestStreamWriter) goUpload() {
61         select {
62         case block, valid := <-m.uploader:
63                 if !valid {
64                         return
65                 }
66                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
67                 signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
68                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
69         }
70
71 }
72
73 type ManifestWriter struct {
74         IKeepClient
75         stripPrefix string
76         Streams     map[string]*ManifestStreamWriter
77 }
78
79 type walker struct {
80         currentDir string
81         m          *ManifestWriter
82 }
83
84 func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
85         log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
86
87         if info.IsDir() {
88                 if path == w.currentDir {
89                         return nil
90                 }
91                 return filepath.Walk(path, walker{path, w.m}.WalkFunc)
92         }
93         m := w.m
94
95         dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
96         fn := path[(len(path) - len(info.Name())):]
97
98         if m.Streams[dir] == nil {
99                 m.Streams[dir] = &ManifestStreamWriter{
100                         m,
101                         &manifest.ManifestStream{StreamName: dir},
102                         0,
103                         nil,
104                         make(chan *Block)}
105                 go m.Streams[dir].goUpload()
106         }
107
108         stream := m.Streams[dir]
109
110         fileStart := stream.offset
111
112         file, err := os.Open(path)
113         if err != nil {
114                 return err
115         }
116
117         var count int64
118         count, err = io.Copy(stream, file)
119         if err != nil && err != io.EOF {
120                 return err
121         }
122
123         stream.offset += count
124
125         stream.ManifestStream.Files = append(stream.ManifestStream.Files,
126                 fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
127
128         return nil
129 }
130
131 func (m *ManifestWriter) Finish() {
132         for _, v := range m.Streams {
133                 if v.uploader != nil {
134                         if v.Block != nil {
135                                 v.uploader <- v.Block
136                         }
137                         close(v.uploader)
138                         v.uploader = nil
139                 }
140         }
141 }
142
143 func (m *ManifestWriter) ManifestText() string {
144         m.Finish()
145         var buf bytes.Buffer
146         for k, v := range m.Streams {
147                 if k == "" {
148                         buf.WriteString(".")
149                 } else {
150                         buf.WriteString("./" + k)
151                 }
152                 for _, b := range v.Blocks {
153                         buf.WriteString(" ")
154                         buf.WriteString(b)
155                 }
156                 for _, f := range v.Files {
157                         buf.WriteString(" ")
158                         buf.WriteString(f)
159                 }
160                 buf.WriteString("\n")
161         }
162         return buf.String()
163 }
164
165 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
166         mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
167         err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
168         mw.Finish()
169
170         if err != nil {
171                 return "", err
172         } else {
173                 return mw.ManifestText(), nil
174         }
175
176 }