Skip to content

Commit

Permalink
Initial attempt to fix resource usage
Browse files Browse the repository at this point in the history
Reference counting is now done manually, but it seems that things can
still go wrong at least during testing
  • Loading branch information
Byron committed Jan 7, 2015
1 parent e0b0bec commit 70fae1f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 49 deletions.
6 changes: 6 additions & 0 deletions doc/source/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Changelog
#########

**********
v0.8.6
**********
- Fixed issue with resources never being freed as mmaps were never closed.
- Client counting is now done manually, instead of relying on pyton's reference count

**********
v0.8.5
**********
Expand Down
27 changes: 13 additions & 14 deletions smmap/mman.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
buffer,
)

from weakref import ref
import sys
from functools import reduce

Expand Down Expand Up @@ -55,8 +54,7 @@ def _destroy(self):
# Actual client count, which doesn't include the reference kept by the manager, nor ours
# as we are about to be deleted
try:
num_clients = self._rlist.client_count() - 2
if num_clients == 0 and len(self._rlist) == 0:
if len(self._rlist) == 0:
# Free all resources associated with the mapped file
self._manager._fdict.pop(self._rlist.path_or_fd())
# END remove regions list from manager
Expand All @@ -78,7 +76,7 @@ def _copy_from(self, rhs):
self._size = rhs._size

if self._region is not None:
self._region.increment_usage_count()
self._region.increment_client_count()
# END handle regions

def __copy__(self):
Expand Down Expand Up @@ -126,20 +124,22 @@ def use_region(self, offset=0, size=0, flags=0):

if need_region:
self._region = man._obtain_region(self._rlist, offset, size, flags, False)
self._region.increment_client_count()
# END need region handling

self._region.increment_usage_count()
self._ofs = offset - self._region._b
self._size = min(size, self._region.ofs_end() - offset)

return self

def unuse_region(self):
"""Unuse the ucrrent region. Does nothing if we have no current region
"""Unuse the current region. Does nothing if we have no current region
**Note:** the cursor unuses the region automatically upon destruction. It is recommended
to un-use the region once you are done reading from it in persistent cursors as it
helps to free up resource more quickly"""
if self._region is not None:
self._region.increment_client_count(-1)
self._region = None
# note: should reset ofs and size, but we spare that for performance. Its not
# allowed to query information if we are not valid !
Expand Down Expand Up @@ -184,12 +184,10 @@ def size(self):
""":return: amount of bytes we point to"""
return self._size

def region_ref(self):
""":return: weak ref to our mapped region.
def region(self):
""":return: our mapped region, or None if nothing is mapped yet
:raise AssertionError: if we have no current region. This is only useful for debugging"""
if self._region is None:
raise AssertionError("region not set")
return ref(self._region)
return self._region

def includes_ofs(self, ofs):
""":return: True if the given absolute offset is contained in the cursors
Expand Down Expand Up @@ -311,8 +309,8 @@ def _collect_lru_region(self, size):
lru_list = None
for regions in self._fdict.values():
for region in regions:
# check client count - consider that we keep one reference ourselves !
if (region.client_count() - 2 == 0 and
# check client count - if it's 1, it's just us
if (region.client_count() == 1 and
(lru_region is None or region._uc < lru_region._uc)):
lru_region = region
lru_list = regions
Expand All @@ -326,6 +324,7 @@ def _collect_lru_region(self, size):

num_found += 1
del(lru_list[lru_list.index(lru_region)])
lru_region.increment_client_count(-1)
self._memory_size -= lru_region.size()
self._handle_count -= 1
# END while there is more memory to free
Expand Down Expand Up @@ -449,7 +448,7 @@ def force_map_handle_removal_win(self, base_path):
for path, rlist in self._fdict.items():
if path.startswith(base_path):
for region in rlist:
region._mf.close()
region.release()
num_closed += 1
# END path matches
# END for each path
Expand Down
26 changes: 13 additions & 13 deletions smmap/test/test_mman.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,29 @@ def test_memman_operation(self):
# window size is 0 for static managers, hence size will be 0. We take that into consideration
size = man.window_size() // 2
assert c.use_region(base_offset, size).is_valid()
rr = c.region_ref()
assert rr().client_count() == 2 # the manager and the cursor and us
rr = c.region()
assert rr.client_count() == 2 # the manager and the cursor and us

assert man.num_open_files() == 1
assert man.num_file_handles() == 1
assert man.mapped_memory_size() == rr().size()
assert man.mapped_memory_size() == rr.size()

# assert c.size() == size # the cursor may overallocate in its static version
assert c.ofs_begin() == base_offset
assert rr().ofs_begin() == 0 # it was aligned and expanded
assert rr.ofs_begin() == 0 # it was aligned and expanded
if man.window_size():
# but isn't larger than the max window (aligned)
assert rr().size() == align_to_mmap(man.window_size(), True)
assert rr.size() == align_to_mmap(man.window_size(), True)
else:
assert rr().size() == fc.size
assert rr.size() == fc.size
# END ignore static managers which dont use windows and are aligned to file boundaries

assert c.buffer()[:] == data[base_offset:base_offset + (size or c.size())]

# obtain second window, which spans the first part of the file - it is a still the same window
nsize = (size or fc.size) - 10
assert c.use_region(0, nsize).is_valid()
assert c.region_ref()() == rr()
assert c.region() == rr
assert man.num_file_handles() == 1
assert c.size() == nsize
assert c.ofs_begin() == 0
Expand All @@ -154,15 +154,15 @@ def test_memman_operation(self):
if man.window_size():
assert man.num_file_handles() == 2
assert c.size() < size
assert c.region_ref()() is not rr() # old region is still available, but has not curser ref anymore
assert rr().client_count() == 1 # only held by manager
assert c.region() is not rr # old region is still available, but has not curser ref anymore
assert rr.client_count() == 1 # only held by manager
else:
assert c.size() < fc.size
# END ignore static managers which only have one handle per file
rr = c.region_ref()
assert rr().client_count() == 2 # manager + cursor
assert rr().ofs_begin() < c.ofs_begin() # it should have extended itself to the left
assert rr().ofs_end() <= fc.size # it cannot be larger than the file
rr = c.region()
assert rr.client_count() == 2 # manager + cursor
assert rr.ofs_begin() < c.ofs_begin() # it should have extended itself to the left
assert rr.ofs_end() <= fc.size # it cannot be larger than the file
assert c.buffer()[:] == data[base_offset:base_offset + (size or c.size())]

# unising a region makes the cursor invalid
Expand Down
9 changes: 1 addition & 8 deletions smmap/test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ def test_region(self):
# auto-refcount
assert rfull.client_count() == 1
rfull2 = rfull
assert rfull.client_count() == 2

# usage
assert rfull.usage_count() == 0
rfull.increment_usage_count()
assert rfull.usage_count() == 1
assert rfull.client_count() == 1, "no auto-counting"

# window constructor
w = MapWindow.from_region(rfull)
Expand All @@ -106,8 +101,6 @@ def test_region_list(self):
for item in (fc.path, fd):
ml = MapRegionList(item)

assert ml.client_count() == 1

assert len(ml) == 0
assert ml.path_or_fd() == item
assert ml.file_size() == fc.size
Expand Down
33 changes: 19 additions & 14 deletions smmap/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(self, path_or_fd, ofs, size, flags=0):
os.close(fd)
# END only close it if we opened it
# END close file handle
# We assume the first one to use us keeps us around
self.increment_client_count()

def _read_into_memory(self, fd, offset, size):
""":return: string data as read from the given file descriptor, offset and size """
Expand Down Expand Up @@ -222,17 +224,25 @@ def includes_ofs(self, ofs):

def client_count(self):
""":return: number of clients currently using this region"""
from sys import getrefcount
# -1: self on stack, -1 self in this method, -1 self in getrefcount
return getrefcount(self) - 3

def usage_count(self):
""":return: amount of usages so far"""
return self._uc

def increment_usage_count(self):
"""Adjust the usage count by the given positive or negative offset"""
self._uc += 1
def increment_client_count(self, ofs = 1):
"""Adjust the usage count by the given positive or negative offset.
If usage count equals 0, we will auto-release our resources
:return: True if we released resources, False otherwise. In the latter case, we can still be used"""
self._uc += ofs
assert self._uc > -1, "Increments must match decrements, usage counter negative: %i" % self._uc

if self.client_count() == 0:
self.release()
return True
else:
return False
# end handle release

def release(self):
"""Release all resources this instance might hold. Must only be called if there usage_count() is zero"""
self._mf.close()

# re-define all methods which need offset adjustments in compatibility mode
if _need_compat_layer:
Expand Down Expand Up @@ -268,11 +278,6 @@ def __init__(self, path_or_fd):
self._path_or_fd = path_or_fd
self._file_size = None

def client_count(self):
""":return: amount of clients which hold a reference to this instance"""
from sys import getrefcount
return getrefcount(self) - 3

def path_or_fd(self):
""":return: path or file descriptor we are attached to"""
return self._path_or_fd
Expand Down

0 comments on commit 70fae1f

Please sign in to comment.