7492: cleanup error checking in keepproxy
[arvados.git] / services / datamanager / experimental / datamanager.py
index 04488231d9ec2c00156b52655d92ec4d7c289051..8207bdcd5cb77b232b5fa582d9b487f08a98251b 100755 (executable)
@@ -4,6 +4,7 @@ import arvados
 
 import argparse
 import cgi
+import csv
 import json
 import logging
 import math
@@ -27,6 +28,14 @@ def fileSizeFormat(value):
   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
                          byteunits[exponent])
 
+def percentageFloor(x):
+  """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+  return math.floor(x*100) / 100.0
+
+
 def byteSizeFromValidUuid(valid_uuid):
   return int(valid_uuid.split('+')[1])
 
@@ -326,14 +335,15 @@ def logUserStorageUsage():
     # the object_type field since we don't know which we have.
     body['object_uuid'] = user
     body['event_type'] = args.user_storage_log_event_type
-    info = {}
-    info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
-    info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
-    info['persisted_collections_total_bytes'] = (
+    properties = {}
+    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+    properties['read_collections_weighted_bytes'] = (
+      usage[WEIGHTED_READ_SIZE_COL])
+    properties['persisted_collections_total_bytes'] = (
       usage[UNWEIGHTED_PERSIST_SIZE_COL])
-    info['persisted_collections_weighted_bytes'] = (
+    properties['persisted_collections_weighted_bytes'] = (
       usage[WEIGHTED_PERSIST_SIZE_COL])
-    body['properties'] = info
+    body['properties'] = properties
     # TODO(misha): Confirm that this will throw an exception if it
     # fails to create the log entry.
     arv.logs().create(body=body).execute()
@@ -348,9 +358,12 @@ def getKeepBlocks(keep_servers):
   blocks = []
   for host,port in keep_servers:
     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
-    blocks.append([line.split(' ')
-                   for line in response.read().split('\n')
-                   if line])
+    server_blocks = [line.split(' ')
+                     for line in response.read().split('\n')
+                     if line]
+    server_blocks = [(block_id, int(mtime))
+                     for block_id, mtime in server_blocks]
+    blocks.append(server_blocks)
   return blocks
 
 def getKeepStats(keep_servers):
@@ -405,12 +418,49 @@ def computeGarbageCollectionCandidates():
        mtime,
        disk_size,
        cumulative_disk_size,
-       float(free_keep_space - cumulative_disk_size)/total_keep_space))
+       float(free_keep_space + cumulative_disk_size)/total_keep_space))
 
   print 'The oldest Garbage Collection Candidates: '
   pprint.pprint(garbage_collection_report[:20])
 
 
+def outputGarbageCollectionReport(filename):
+  with open(filename, 'wb') as csvfile:
+    gcwriter = csv.writer(csvfile)
+    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+                       'cumulative size', 'disk free'])
+    for line in garbage_collection_report:
+      gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+  # TODO(misha): Modify this to allow users to specify the number of
+  # histogram buckets through a flag.
+  histogram = []
+  last_percentage = -1
+  for _,mtime,_,_,disk_free in garbage_collection_report:
+    curr_percentage = percentageFloor(disk_free)
+    if curr_percentage > last_percentage:
+      histogram.append( (mtime, curr_percentage) )
+    last_percentage = curr_percentage
+
+  log.info('Garbage collection histogram is: %s', histogram)
+
+  return histogram
+
+
+def logGarbageCollectionHistogram():
+  body = {}
+  # TODO(misha): Decide whether we should specify an object_uuid in
+  # the body and if so, which uuid to use.
+  body['event_type'] = args.block_age_free_space_histogram_log_event_type
+  properties = {}
+  properties['histogram'] = garbage_collection_histogram
+  body['properties'] = properties
+  # TODO(misha): Confirm that this will throw an exception if it
+  # fails to create the log entry.
+  arv.logs().create(body=body).execute()
+
+
 def detectReplicationProblems():
   blocks_not_in_any_collections.update(
     set(block_to_replication.keys()).difference(block_to_collections.keys()))
@@ -491,6 +541,14 @@ parser.add_argument('--user-storage-log-event-type',
                     default='user-storage-report',
                     help=('The event type to set when logging user '
                           'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+                    default='block-age-free-space-histogram',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+                    default='',
+                    help=('The file to write a garbage collection report, or '
+                          'leave empty for no report.'))
 
 args = None
 
@@ -544,10 +602,20 @@ multiplied by current replication level)
 * cumulative disk size: The sum of this block's disk size and all the
 blocks listed above it
 * disk free: The proportion of our disk space that would be free if we
-deleted this block and all the above. So this is (free disk space -
+deleted this block and all the above. So this is (free disk space +
 cumulative disk size) / total disk capacity
 """
 
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
+"""
+
 # Stuff to report on
 blocks_not_in_any_collections = set()
 underreplicated_persisted_blocks = set()
@@ -596,6 +664,11 @@ def loadAllData():
   total_keep_space = sum(map(itemgetter(0), keep_stats))
   free_keep_space = sum(map(itemgetter(1), keep_stats))
 
+  # TODO(misha): Delete this hack when the keep servers are fixed!
+  # This hack deals with the fact that keep servers report each other's disks.
+  total_keep_space /= len(keep_stats)
+  free_keep_space /= len(keep_stats)
+
   log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
            (fileSizeFormat(total_keep_space),
             fileSizeFormat(free_keep_space),
@@ -603,12 +676,23 @@ def loadAllData():
 
   computeReplication(keep_blocks)
 
-  computeGarbageCollectionCandidates()
-
   log.info('average replication level is %f',
            (float(sum(block_to_replication.values())) /
             len(block_to_replication)))
 
+  computeGarbageCollectionCandidates()
+
+  if args.garbage_collection_file:
+    log.info('Writing garbage Collection report to %s',
+             args.garbage_collection_file)
+    outputGarbageCollectionReport(args.garbage_collection_file)
+
+  global garbage_collection_histogram
+  garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+  if args.log_to_workbench:
+    logGarbageCollectionHistogram()
+
   detectReplicationProblems()
 
   computeUserStorageUsage()