9427: Back out redundant connection pool configuration option and add comment
[arvados.git] / services / datamanager / experimental / datamanager.py
index 08e688e8ac974f92af179d6a1da984b98f14804a..8207bdcd5cb77b232b5fa582d9b487f08a98251b 100755 (executable)
@@ -4,6 +4,8 @@ import arvados
 
 import argparse
 import cgi
+import csv
+import json
 import logging
 import math
 import pprint
@@ -26,6 +28,14 @@ def fileSizeFormat(value):
   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
                          byteunits[exponent])
 
+def percentageFloor(x):
+  """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+  return math.floor(x*100) / 100.0
+
+
 def byteSizeFromValidUuid(valid_uuid):
   return int(valid_uuid.split('+')[1])
 
@@ -76,7 +86,7 @@ class CollectionInfo:
     if not CollectionInfo.all_by_uuid.has_key(uuid):
       CollectionInfo(uuid)
     return CollectionInfo.all_by_uuid[uuid]
-  
+
 
 def extractUuid(candidate):
   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
@@ -325,14 +335,15 @@ def logUserStorageUsage():
     # the object_type field since we don't know which we have.
     body['object_uuid'] = user
     body['event_type'] = args.user_storage_log_event_type
-    info = {}
-    info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
-    info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
-    info['persisted_collections_total_bytes'] = (
+    properties = {}
+    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+    properties['read_collections_weighted_bytes'] = (
+      usage[WEIGHTED_READ_SIZE_COL])
+    properties['persisted_collections_total_bytes'] = (
       usage[UNWEIGHTED_PERSIST_SIZE_COL])
-    info['persisted_collections_weighted_bytes'] = (
+    properties['persisted_collections_weighted_bytes'] = (
       usage[WEIGHTED_PERSIST_SIZE_COL])
-    body['properties'] = info
+    body['properties'] = properties
     # TODO(misha): Confirm that this will throw an exception if it
     # fails to create the log entry.
     arv.logs().create(body=body).execute()
@@ -347,11 +358,37 @@ def getKeepBlocks(keep_servers):
   blocks = []
   for host,port in keep_servers:
     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
-    blocks.append([line.split(' ')
-                   for line in response.read().split('\n')
-                   if line])
+    server_blocks = [line.split(' ')
+                     for line in response.read().split('\n')
+                     if line]
+    server_blocks = [(block_id, int(mtime))
+                     for block_id, mtime in server_blocks]
+    blocks.append(server_blocks)
   return blocks
 
+def getKeepStats(keep_servers):
+  MOUNT_COLUMN = 5
+  TOTAL_COLUMN = 1
+  FREE_COLUMN = 3
+  DISK_BLOCK_SIZE = 1024
+  stats = []
+  for host,port in keep_servers:
+    response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
+
+    parsed_json = json.load(response)
+    df_entries = [line.split()
+                  for line in parsed_json['df'].split('\n')
+                  if line]
+    keep_volumes = [columns
+                    for columns in df_entries
+                    if 'keep' in columns[MOUNT_COLUMN]]
+    total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
+                                                  keep_volumes)))
+    free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
+                                                  keep_volumes)))
+    stats.append([total_space, free_space])
+  return stats
+
 
 def computeReplication(keep_blocks):
   for server_blocks in keep_blocks:
@@ -360,6 +397,70 @@ def computeReplication(keep_blocks):
   log.debug('Seeing the following replication levels among blocks: %s',
             str(set(block_to_replication.values())))
 
+
+def computeGarbageCollectionCandidates():
+  for server_blocks in keep_blocks:
+    block_to_latest_mtime.addValues(server_blocks)
+  empty_set = set()
+  garbage_collection_priority = sorted(
+    [(block,mtime)
+     for block,mtime in block_to_latest_mtime.items()
+     if len(block_to_persisters.get(block,empty_set)) == 0],
+    key = itemgetter(1))
+  global garbage_collection_report
+  garbage_collection_report = []
+  cumulative_disk_size = 0
+  for block,mtime in garbage_collection_priority:
+    disk_size = blockDiskUsage(block)
+    cumulative_disk_size += disk_size
+    garbage_collection_report.append(
+      (block,
+       mtime,
+       disk_size,
+       cumulative_disk_size,
+       float(free_keep_space + cumulative_disk_size)/total_keep_space))
+
+  print 'The oldest Garbage Collection Candidates: '
+  pprint.pprint(garbage_collection_report[:20])
+
+
+def outputGarbageCollectionReport(filename):
+  with open(filename, 'wb') as csvfile:
+    gcwriter = csv.writer(csvfile)
+    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+                       'cumulative size', 'disk free'])
+    for line in garbage_collection_report:
+      gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+  # TODO(misha): Modify this to allow users to specify the number of
+  # histogram buckets through a flag.
+  histogram = []
+  last_percentage = -1
+  for _,mtime,_,_,disk_free in garbage_collection_report:
+    curr_percentage = percentageFloor(disk_free)
+    if curr_percentage > last_percentage:
+      histogram.append( (mtime, curr_percentage) )
+    last_percentage = curr_percentage
+
+  log.info('Garbage collection histogram is: %s', histogram)
+
+  return histogram
+
+
+def logGarbageCollectionHistogram():
+  body = {}
+  # TODO(misha): Decide whether we should specify an object_uuid in
+  # the body and if so, which uuid to use.
+  body['event_type'] = args.block_age_free_space_histogram_log_event_type
+  properties = {}
+  properties['histogram'] = garbage_collection_histogram
+  body['properties'] = properties
+  # TODO(misha): Confirm that this will throw an exception if it
+  # fails to create the log entry.
+  arv.logs().create(body=body).execute()
+
+
 def detectReplicationProblems():
   blocks_not_in_any_collections.update(
     set(block_to_replication.keys()).difference(block_to_collections.keys()))
@@ -440,6 +541,14 @@ parser.add_argument('--user-storage-log-event-type',
                     default='user-storage-report',
                     help=('The event type to set when logging user '
                           'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+                    default='block-age-free-space-histogram',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+                    default='',
+                    help=('The file to write a garbage collection report, or '
+                          'leave empty for no report.'))
 
 args = None
 
@@ -473,7 +582,39 @@ user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
 
 keep_servers = []
 keep_blocks = []
+keep_stats = []
+total_keep_space = 0
+free_keep_space =  0
+
 block_to_replication = defaultdict(lambda: 0)
+block_to_latest_mtime = maxdict()
+
+garbage_collection_report = []
+"""A list of non-persisted blocks, sorted by increasing mtime
+
+Each entry is of the form (block uuid, latest mtime, disk size,
+cumulative size)
+
+* block uuid: The id of the block we want to delete
+* latest mtime: The latest mtime of the block across all keep servers.
+* disk size: The total disk space used by this block (block size
+multiplied by current replication level)
+* cumulative disk size: The sum of this block's disk size and all the
+blocks listed above it
+* disk free: The proportion of our disk space that would be free if we
+deleted this block and all the above. So this is (free disk space +
+cumulative disk size) / total disk capacity
+"""
+
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
+"""
 
 # Stuff to report on
 blocks_not_in_any_collections = set()
@@ -516,12 +657,42 @@ def loadAllData():
   global keep_blocks
   keep_blocks = getKeepBlocks(keep_servers)
 
+  log.info('Getting Stats from each Keep Server.')
+  global keep_stats, total_keep_space, free_keep_space
+  keep_stats = getKeepStats(keep_servers)
+
+  total_keep_space = sum(map(itemgetter(0), keep_stats))
+  free_keep_space = sum(map(itemgetter(1), keep_stats))
+
+  # TODO(misha): Delete this hack when the keep servers are fixed!
+  # This hack deals with the fact that keep servers report each other's disks.
+  total_keep_space /= len(keep_stats)
+  free_keep_space /= len(keep_stats)
+
+  log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
+           (fileSizeFormat(total_keep_space),
+            fileSizeFormat(free_keep_space),
+            100*free_keep_space/total_keep_space))
+
   computeReplication(keep_blocks)
 
   log.info('average replication level is %f',
            (float(sum(block_to_replication.values())) /
             len(block_to_replication)))
 
+  computeGarbageCollectionCandidates()
+
+  if args.garbage_collection_file:
+    log.info('Writing garbage Collection report to %s',
+             args.garbage_collection_file)
+    outputGarbageCollectionReport(args.garbage_collection_file)
+
+  global garbage_collection_histogram
+  garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+  if args.log_to_workbench:
+    logGarbageCollectionHistogram()
+
   detectReplicationProblems()
 
   computeUserStorageUsage()
@@ -555,10 +726,10 @@ class DataManagerHandler(BaseHTTPRequestHandler):
 
   def writeTop(self, title):
     self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
-    
+
   def writeBottom(self):
     self.wfile.write('</BODY></HTML>\n')
-    
+
   def writeHomePage(self):
     self.send_response(200)
     self.end_headers()
@@ -676,7 +847,7 @@ class DataManagerHandler(BaseHTTPRequestHandler):
         blocks = replication_to_blocks[replication_level]
         self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
       self.wfile.write('</TR></TABLE>\n')
-      
+
 
   def do_GET(self):
     if not all_data_loaded: