Skip to content

Commit

Permalink
0.2.7 version
Browse files Browse the repository at this point in the history
  • Loading branch information
supriyopaul committed Nov 6, 2018
1 parent 5dc4768 commit bb65f71
Showing 1 changed file with 94 additions and 4 deletions.
98 changes: 94 additions & 4 deletions deeputil/streamcounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

class StreamCounter(object):
'''
A class whose responsibility is to get the count of items
in data comming as a stream.
'''
#TODO Doctests and examples
#TODO Doctests and examples
# When we receive a stream of data, we fix the max size of chunk
# Think of chunk as a container, which can only fit a fixed no. of items
# This will help us to keep control over RAM usage
DEFAULT_CHUNK_SIZE = 1000000
# When we have a container, we also want to count the occurences of items
# Max count will be maximum occurence of an item
DEFAULT_MAX_COUNTS = 1000000

def __init__(self, chunk_size=DEFAULT_CHUNK_SIZE,
Expand Down Expand Up @@ -38,9 +45,40 @@ def __init__(self, chunk_size=DEFAULT_CHUNK_SIZE,
self.counts_total = 0

def add(self, item, count=1):
self.counts[item] += count
self.counts_total += count

'''
When we receive stream of data, we add them in the chunk
which has limit on the no. of items that it will store.
>>> s = StreamCounter(5,5)
>>> data_stream = ['a','b','c','d']
>>> for item in data_stream:
... s.add(item)
>>> s.chunk_size
5
>>> s.n_items_seen
4
>>> s.n_chunk_items_seen
4
>>> s.n_chunks
0
>>> from pprint import pprint
>>> pprint(s.chunked_counts.get(s.n_chunks, {}))
{'a': 1, 'b': 1, 'c': 1, 'd': 1}
>>> s.counts_total
4
>>> data_stream = ['a','b','c','d','e','f','g','e']
>>> for item in data_stream:
... s.add(item)
>>> s.chunk_size
5
>>> s.n_items_seen
12
>>> s.n_chunk_items_seen
2
>>> s.n_chunks
2
>>> s.chunked_counts.get(s.n_chunks, {})
{'g': 1, 'e': 1}
'''
self.n_items_seen += count
self.n_chunk_items_seen += count

Expand All @@ -67,6 +105,27 @@ def add(self, item, count=1):
self._drop_oldest_chunk()

def _drop_oldest_chunk(self):
'''
To handle the case when the items comming in the chunk
is more than the maximum capacity of the chunk. Our intent
behind is to remove the oldest chunk. So that the items come
flowing in.
>>> s = StreamCounter(5,5)
>>> data_stream = ['a','b','c','d']
>>> for item in data_stream:
... s.add(item)
>>> min(s.chunked_counts.keys())
0
>>> s.chunked_counts
{0: {'a': 1, 'b': 1, 'c': 1, 'd': 1}}
>>> data_stream = ['a','b','c','d','a','e','f']
>>> for item in data_stream:
... s.add(item)
>>> min(s.chunked_counts.keys())
2
>>> s.chunked_counts
{2: {'f': 1}}
'''
chunk_id = min(self.chunked_counts.keys())
chunk = self.chunked_counts.pop(chunk_id)

Expand All @@ -76,6 +135,37 @@ def _drop_oldest_chunk(self):
self.counts_total -= v

def get(self, item, default=0, normalized=False):
'''
When we have the stream of data pushed in the chunk
we can retrive count of an item using this method.
>>> stream_counter_obj = StreamCounter(5,5)
>>> data_stream = ['a','b','c']
>>> for item in data_stream:
... stream_counter_obj.add(item)
>>> stream_counter_obj.get('a')
1
>>> stream_counter_obj.get('b')
1
>>> stream_counter_obj.get('c')
1
>>> stream_counter_obj.get('d')
0
>>> data_stream.extend(['d','e','f'])
>>> for item in data_stream:
... stream_counter_obj.add(item)
>>> stream_counter_obj.get('a')
0
>>> stream_counter_obj.get('b')
0
>>> stream_counter_obj.get('c')
1
>>> stream_counter_obj.get('d')
1
>>> stream_counter_obj.get('e')
1
>>> stream_counter_obj.get('f')
1
'''
c = self.counts.get(item, default)
if not normalized:
return c
Expand Down

0 comments on commit bb65f71

Please sign in to comment.