var d BlockDigest
d.h, err = strconv.ParseUint(s[:16], 16, 64)
- if err != nil {return}
+ if err != nil {
+ return
+ }
d.l, err = strconv.ParseUint(s[16:], 16, 64)
- if err != nil {return}
+ if err != nil {
+ return
+ }
dig = d
return
}
)
type LoggerParams struct {
- Client arvadosclient.ArvadosClient // The client we use to write log entries
- EventType string // The event type to assign to the log entry.
- MinimumWriteInterval time.Duration // Wait at least this long between log writes
+ Client arvadosclient.ArvadosClient // The client we use to write log entries
+ EventType string // The event type to assign to the log entry.
+ MinimumWriteInterval time.Duration // Wait at least this long between log writes
}
// A Logger is used to build up a log entry over time and write every
// version of it.
type Logger struct {
// The Data we write
- data map[string]interface{} // The entire map that we give to the api
- entry map[string]interface{} // Convenience shortcut into data
- properties map[string]interface{} // Convenience shortcut into data
+ data map[string]interface{} // The entire map that we give to the api
+ entry map[string]interface{} // Convenience shortcut into data
+ properties map[string]interface{} // Convenience shortcut into data
- lock sync.Locker // Synchronizes editing and writing
- params LoggerParams // Parameters we were given
+ lock sync.Locker // Synchronizes editing and writing
+ params LoggerParams // Parameters we were given
- lastWrite time.Time // The last time we wrote a log entry
- modified bool // Has this data been modified since the last write
+ lastWrite time.Time // The last time we wrote a log entry
+ modified bool // Has this data been modified since the last write
- writeHooks []func(map[string]interface{},map[string]interface{})
+ writeHooks []func(map[string]interface{}, map[string]interface{})
}
// Create a new logger based on the specified parameters.
func NewLogger(params LoggerParams) *Logger {
// TODO(misha): Add some params checking here.
l := &Logger{data: make(map[string]interface{}),
- lock: &sync.Mutex{},
+ lock: &sync.Mutex{},
params: params}
l.entry = make(map[string]interface{})
l.data["log"] = l.entry
// Get access to the maps you can edit. This will hold a lock until
// you call Record. Do not edit the maps in any other goroutines or
// after calling Record.
-// You don't need to edit both maps,
+// You don't need to edit both maps,
// properties can take any values you want to give it,
// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
// properties is a shortcut for entry["properties"].(map[string]interface{})
func (l *Logger) Edit() (properties map[string]interface{}, entry map[string]interface{}) {
l.lock.Lock()
- l.modified = true // We don't actually know the caller will modifiy the data, but we assume they will.
+ l.modified = true // We don't actually know the caller will modifiy the data, but we assume they will.
return l.properties, l.entry
}
return l.lastWrite.Add(l.params.MinimumWriteInterval).Before(time.Now())
}
-
// Actually writes the log entry. This method assumes we're holding the lock.
func (l *Logger) write() {
l.modified = false
}
-
func (l *Logger) acquireLockConsiderWriting() {
l.lock.Lock()
if l.modified && l.writeAllowedNow() {
}
type BlockLocator struct {
- Digest blockdigest.BlockDigest
- Size int
- Hints []string
+ Digest blockdigest.BlockDigest
+ Size int
+ Hints []string
}
type ManifestLine struct {
- StreamName string
- Blocks []string
- Files []string
+ StreamName string
+ Blocks []string
+ Files []string
}
func ParseBlockLocator(s string) (b BlockLocator, err error) {
if !LocatorPattern.MatchString(s) {
- err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern " +
+ err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
"\"%s\".",
s,
LocatorPattern.String())
// We expect both of the following to succeed since LocatorPattern
// restricts the strings appropriately.
blockDigest, err = blockdigest.FromString(tokens[0])
- if err != nil {return}
+ if err != nil {
+ return
+ }
blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
- if err != nil {return}
+ if err != nil {
+ return
+ }
b.Digest = blockDigest
b.Size = int(blockSize)
b.Hints = tokens[2:]
return ch
}
-
// Blocks may appear mulitple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.
}
numAvailable, err = response.NumItemsAvailable()
if err != nil {
- log.Fatalf("Error retrieving number of items available from " +
+ log.Fatalf("Error retrieving number of items available from "+
"SDK response: %v",
err)
}
heap_profile_filename string
// globals for debugging
totalManifestSize uint64
- maxManifestSize uint64
+ maxManifestSize uint64
)
type Collection struct {
- Uuid string
- OwnerUuid string
- ReplicationLevel int
+ Uuid string
+ OwnerUuid string
+ ReplicationLevel int
BlockDigestToSize map[blockdigest.BlockDigest]int
- TotalSize int
+ TotalSize int
}
type ReadCollections struct {
- ReadAllCollections bool
- UuidToCollection map[string]Collection
+ ReadAllCollections bool
+ UuidToCollection map[string]Collection
OwnerToCollectionSize map[string]int
}
type GetCollectionsParams struct {
- Client arvadosclient.ArvadosClient
- Logger *logger.Logger
+ Client arvadosclient.ArvadosClient
+ Logger *logger.Logger
BatchSize int
}
type SdkCollectionInfo struct {
- Uuid string `json:"uuid"`
- OwnerUuid string `json:"owner_uuid"`
- Redundancy int `json:"redundancy"`
- ModifiedAt time.Time `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
+ Uuid string `json:"uuid"`
+ OwnerUuid string `json:"owner_uuid"`
+ Redundancy int `json:"redundancy"`
+ ModifiedAt time.Time `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
}
type SdkCollectionList struct {
- ItemsAvailable int `json:"items_available"`
- Items []SdkCollectionInfo `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Items []SdkCollectionInfo `json:"items"`
}
func init() {
- flag.StringVar(&heap_profile_filename,
+ flag.StringVar(&heap_profile_filename,
"heap-profile",
"",
"File to write the heap profiles to. Leave blank to skip profiling.")
}
}
-
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
ComputeSizeOfOwnedCollections(&results)
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
+ properties, _ := params.Logger.Edit()
collectionInfo := properties["collection_info"].(map[string]interface{})
collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
params.Logger.Record()
"modified_at"}
sdkParams := arvadosclient.Dict{
- "select": fieldsWanted,
- "order": []string{"modified_at ASC"},
+ "select": fieldsWanted,
+ "order": []string{"modified_at ASC"},
"filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
if params.BatchSize > 0 {
results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
+ properties, _ := params.Logger.Edit()
collectionInfo := make(map[string]interface{})
collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
collectionInfo["batch_size"] = params.BatchSize
// Process collection and update our date filter.
sdkParams["filters"].([][]string)[0][2] =
ProcessCollections(params.Logger,
- collections.Items,
- results.UuidToCollection).Format(time.RFC3339)
+ collections.Items,
+ results.UuidToCollection).Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
totalCollections = len(results.UuidToCollection)
- log.Printf("%d collections read, %d new in last batch, " +
+ log.Printf("%d collections read, %d new in last batch, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
totalCollections,
- totalCollections - previousTotalCollections,
+ totalCollections-previousTotalCollections,
sdkParams["filters"].([][]string)[0][2],
float32(totalManifestSize)/float32(totalCollections),
maxManifestSize, totalManifestSize)
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
+ properties, _ := params.Logger.Edit()
collectionInfo := properties["collection_info"].(map[string]interface{})
collectionInfo["collections_read"] = totalCollections
collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
return
}
-
// StrCopy returns a newly allocated string.
// It is useful to copy slices so that the garbage collector can reuse
// the memory of the longer strings they came from.
return string([]byte(s))
}
-
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
for _, sdkCollection := range receivedCollections {
collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
- OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
- ReplicationLevel: sdkCollection.Redundancy,
+ OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
+ ReplicationLevel: sdkCollection.Redundancy,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf(
- "Arvados SDK collection returned with unexpected zero " +
- "modifcation date. This probably means that either we failed to " +
- "parse the modification date or the API server has changed how " +
+ "Arvados SDK collection returned with unexpected zero "+
+ "modifcation date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
"it returns modification dates: %v",
collection))
}
if manifestSize > maxManifestSize {
maxManifestSize = manifestSize
}
-
+
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
- if stored_size, stored := collection.BlockDigestToSize[block.Digest];
- stored && stored_size != block.Size {
+ if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
message := fmt.Sprintf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.Uuid,
return
}
-
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
+func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int {
var collections SdkCollectionList
sdkParams := arvadosclient.Dict{"limit": 0}
err := client.List("collections", sdkParams, &collections)
return collections.ItemsAvailable
}
-
func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
readCollections.OwnerToCollectionSize = make(map[string]int)
for _, coll := range readCollections.UuidToCollection {
)
var (
- logEventType string
+ logEventType string
logFrequencySeconds int
)
func init() {
- flag.StringVar(&logEventType,
+ flag.StringVar(&logEventType,
"log-event-type",
"experimental-data-manager-report",
"event_type to use in our arvados log entries. Set to empty to turn off logging")
- flag.IntVar(&logFrequencySeconds,
+ flag.IntVar(&logFrequencySeconds,
"log-frequency-seconds",
20,
"How frequently we'll write log entries in seconds.")
var arvLogger *logger.Logger
if logEventType != "" {
arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
- EventType: logEventType,
+ EventType: logEventType,
MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)})
}
collectionChannel := make(chan collection.ReadCollections)
- go func() { collectionChannel <- collection.GetCollectionsAndSummarize(
- collection.GetCollectionsParams{
- Client: arv, Logger: arvLogger, BatchSize: 50}) }()
+ go func() {
+ collectionChannel <- collection.GetCollectionsAndSummarize(
+ collection.GetCollectionsParams{
+ Client: arv, Logger: arvLogger, BatchSize: 50})
+ }()
keepServerInfo := keep.GetKeepServersAndSummarize(
keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000})
readCollections := <-collectionChannel
- // Make compiler happy.
+ // Make compiler happy.
_ = readCollections
_ = keepServerInfo
// Log that we're finished
if arvLogger != nil {
- properties,_ := arvLogger.Edit()
+ properties, _ := arvLogger.Edit()
properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
// Force the recording, since go will not wait for the timer before exiting.
arvLogger.ForceRecord()
}
func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) {
- _ = entry // keep the compiler from complaining
+ _ = entry // keep the compiler from complaining
runInfo := properties["run_info"].(map[string]interface{})
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/sdk/go/util"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
- "log"
"io/ioutil"
+ "log"
"net/http"
"strconv"
"strings"
type ServerAddress struct {
Host string `json:"service_host"`
- Port int `json:"service_port"`
+ Port int `json:"service_port"`
}
// Info about a particular block returned by the server
type BlockInfo struct {
- Digest blockdigest.BlockDigest
- Size int
- Mtime int // TODO(misha): Replace this with a timestamp.
+ Digest blockdigest.BlockDigest
+ Size int
+ Mtime int // TODO(misha): Replace this with a timestamp.
}
// Info about a specified block given by a server
type BlockServerInfo struct {
ServerIndex int
Size int
- Mtime int // TODO(misha): Replace this with a timestamp.
+ Mtime int // TODO(misha): Replace this with a timestamp.
}
type ServerContents struct {
}
type ServerResponse struct {
- Address ServerAddress
+ Address ServerAddress
Contents ServerContents
}
type ReadServers struct {
- ReadAllServers bool
- KeepServerIndexToAddress []ServerAddress
- KeepServerAddressToIndex map[ServerAddress]int
- ServerToContents map[ServerAddress]ServerContents
- BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo
- BlockReplicationCounts map[int]int
+ ReadAllServers bool
+ KeepServerIndexToAddress []ServerAddress
+ KeepServerAddressToIndex map[ServerAddress]int
+ ServerToContents map[ServerAddress]ServerContents
+ BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo
+ BlockReplicationCounts map[int]int
}
type GetKeepServersParams struct {
Client arvadosclient.ArvadosClient
Logger *logger.Logger
- Limit int
+ Limit int
}
type KeepServiceList struct {
- ItemsAvailable int `json:"items_available"`
- KeepServers []ServerAddress `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ KeepServers []ServerAddress `json:"items"`
}
// Methods to implement util.SdkListResponse Interface
var (
// Don't access the token directly, use getDataManagerToken() to
// make sure it's been read.
- dataManagerToken string
- dataManagerTokenFile string
- dataManagerTokenFileReadOnce sync.Once
+ dataManagerToken string
+ dataManagerTokenFile string
+ dataManagerTokenFileReadOnce sync.Once
)
func init() {
- flag.StringVar(&dataManagerTokenFile,
+ flag.StringVar(&dataManagerTokenFile,
"data-manager-token-file",
"",
"File with the API token we should use to contact keep servers.")
}
-func getDataManagerToken(arvLogger *logger.Logger) (string) {
- readDataManagerToken := func () {
+func getDataManagerToken(arvLogger *logger.Logger) string {
+ readDataManagerToken := func() {
if dataManagerTokenFile == "" {
flag.Usage()
loggerutil.FatalWithMessage(arvLogger,
results.ReadAllServers, numReceived, numAvailable =
util.ContainsAllAvailableItems(sdkResponse)
- if (!results.ReadAllServers) {
+ if !results.ReadAllServers {
log.Printf("ERROR: Did not receive all keep server addresses.")
}
log.Printf("Received %d of %d available keep server addresses.",
}
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
+ properties, _ := params.Logger.Edit()
keepInfo := make(map[string]interface{})
keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
// Read all the responses
for i := range sdkResponse.KeepServers {
- _ = i // Here to prevent go from complaining.
- response := <- responseChan
+ _ = i // Here to prevent go from complaining.
+ response := <-responseChan
log.Printf("Received channel response from %v containing %d files",
response.Address,
len(response.Contents.BlockDigestToInfo))
results.BlockToServers[blockInfo.Digest] = append(
results.BlockToServers[blockInfo.Digest],
BlockServerInfo{ServerIndex: serverIndex,
- Size: blockInfo.Size,
+ Size: blockInfo.Size,
Mtime: blockInfo.Mtime})
}
}
func GetServerContents(arvLogger *logger.Logger,
keepServer ServerAddress,
client http.Client,
- responseChan chan<- ServerResponse) () {
+ responseChan chan<- ServerResponse) {
// Create and send request.
url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
log.Println("About to fetch keep server contents from " + url)
// the case of a size tie.
if storedBlock.Size < blockInfo.Size ||
(storedBlock.Size == blockInfo.Size &&
- storedBlock.Mtime < blockInfo.Mtime) {
+ storedBlock.Mtime < blockInfo.Mtime) {
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
}
} else {
if err := scanner.Err(); err != nil {
log.Fatalf("Received error scanning response from %s: %v", url, err)
} else {
- log.Printf("%s contained %d lines with %d duplicates with " +
+ log.Printf("%s contained %d lines with %d duplicates with "+
"%d size disagreements",
url,
numLines,
func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
tokens := strings.Fields(indexLine)
if len(tokens) != 2 {
- err = fmt.Errorf("Expected 2 tokens per line but received a " +
+ err = fmt.Errorf("Expected 2 tokens per line but received a "+
"line containing %v instead.",
tokens)
}
return
}
if len(locator.Hints) > 0 {
- err = fmt.Errorf("Block locator in index line should not contain hints " +
+ err = fmt.Errorf("Block locator in index line should not contain hints "+
"but it does: %v",
locator)
return
// for the lock you're already holding.
func FatalWithMessage(arvLogger *logger.Logger, message string) {
if arvLogger != nil {
- properties,_ := arvLogger.Edit()
+ properties, _ := arvLogger.Edit()
properties["FATAL"] = message
properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
arvLogger.ForceRecord()
log.Fatalf(message)
}
-