Merge branch 'master' into 12018-sync-groups-tool
[arvados.git] / sdk / go / crunchrunner / upload.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "io"
13         "log"
14         "os"
15         "path/filepath"
16         "sort"
17         "strings"
18
19         "git.curoverse.com/arvados.git/sdk/go/keepclient"
20         "git.curoverse.com/arvados.git/sdk/go/manifest"
21 )
22
23 type Block struct {
24         data   []byte
25         offset int64
26 }
27
28 type ManifestStreamWriter struct {
29         *ManifestWriter
30         *manifest.ManifestStream
31         offset int64
32         *Block
33         uploader chan *Block
34         finish   chan []error
35 }
36
37 type IKeepClient interface {
38         PutHB(hash string, buf []byte) (string, int, error)
39 }
40
41 func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
42         n, err := m.ReadFrom(bytes.NewReader(p))
43         return int(n), err
44 }
45
46 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
47         var total int64
48         var count int
49
50         for err == nil {
51                 if m.Block == nil {
52                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
53                 }
54                 count, err = r.Read(m.Block.data[m.Block.offset:])
55                 total += int64(count)
56                 m.Block.offset += int64(count)
57                 if m.Block.offset == keepclient.BLOCKSIZE {
58                         m.uploader <- m.Block
59                         m.Block = nil
60                 }
61         }
62
63         if err == io.EOF {
64                 return total, nil
65         } else {
66                 return total, err
67         }
68
69 }
70
71 func (m *ManifestStreamWriter) goUpload() {
72         var errors []error
73         uploader := m.uploader
74         finish := m.finish
75         for block := range uploader {
76                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
77                 signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
78                 if err != nil {
79                         errors = append(errors, err)
80                 } else {
81                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
82                 }
83         }
84         finish <- errors
85 }
86
87 type ManifestWriter struct {
88         IKeepClient
89         stripPrefix string
90         Streams     map[string]*ManifestStreamWriter
91 }
92
93 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
94         if err != nil {
95                 return err
96         }
97
98         targetPath, targetInfo := path, info
99         if info.Mode()&os.ModeSymlink != 0 {
100                 // Update targetpath/info to reflect the symlink
101                 // target, not the symlink itself
102                 targetPath, err = filepath.EvalSymlinks(path)
103                 if err != nil {
104                         return err
105                 }
106                 targetInfo, err = os.Stat(targetPath)
107                 if err != nil {
108                         return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
109                 }
110         }
111
112         if targetInfo.Mode()&os.ModeType != 0 {
113                 // Skip directories, pipes, other non-regular files
114                 return nil
115         }
116
117         var dir string
118         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
119                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
120         }
121         if dir == "" {
122                 dir = "."
123         }
124
125         fn := path[(len(path) - len(info.Name())):]
126
127         if m.Streams[dir] == nil {
128                 m.Streams[dir] = &ManifestStreamWriter{
129                         m,
130                         &manifest.ManifestStream{StreamName: dir},
131                         0,
132                         nil,
133                         make(chan *Block),
134                         make(chan []error)}
135                 go m.Streams[dir].goUpload()
136         }
137
138         stream := m.Streams[dir]
139
140         fileStart := stream.offset
141
142         file, err := os.Open(path)
143         if err != nil {
144                 return err
145         }
146
147         log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
148
149         var count int64
150         count, err = io.Copy(stream, file)
151         if err != nil {
152                 return err
153         }
154
155         stream.offset += count
156
157         stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
158                 manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
159
160         return nil
161 }
162
163 func (m *ManifestWriter) Finish() error {
164         var errstring string
165         for _, stream := range m.Streams {
166                 if stream.uploader == nil {
167                         continue
168                 }
169                 if stream.Block != nil {
170                         stream.uploader <- stream.Block
171                 }
172                 close(stream.uploader)
173                 stream.uploader = nil
174
175                 errors := <-stream.finish
176                 close(stream.finish)
177                 stream.finish = nil
178
179                 for _, r := range errors {
180                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
181                 }
182         }
183         if errstring != "" {
184                 return errors.New(errstring)
185         } else {
186                 return nil
187         }
188 }
189
190 func (m *ManifestWriter) ManifestText() string {
191         m.Finish()
192         var buf bytes.Buffer
193
194         dirs := make([]string, len(m.Streams))
195         i := 0
196         for k := range m.Streams {
197                 dirs[i] = k
198                 i++
199         }
200         sort.Strings(dirs)
201
202         for _, k := range dirs {
203                 v := m.Streams[k]
204
205                 if k == "." {
206                         buf.WriteString(".")
207                 } else {
208                         k = strings.Replace(k, " ", "\\040", -1)
209                         k = strings.Replace(k, "\n", "", -1)
210                         buf.WriteString("./" + k)
211                 }
212                 for _, b := range v.Blocks {
213                         buf.WriteString(" ")
214                         buf.WriteString(b)
215                 }
216                 for _, f := range v.FileStreamSegments {
217                         buf.WriteString(" ")
218                         name := strings.Replace(f.Name, " ", "\\040", -1)
219                         name = strings.Replace(name, "\n", "", -1)
220                         buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
221                 }
222                 buf.WriteString("\n")
223         }
224         return buf.String()
225 }
226
227 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
228         mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
229         err = filepath.Walk(root, mw.WalkFunc)
230
231         if err != nil {
232                 return "", err
233         }
234
235         err = mw.Finish()
236         if err != nil {
237                 return "", err
238         }
239
240         return mw.ManifestText(), nil
241 }