Skip to content
This repository has been archived by the owner on Apr 1, 2021. It is now read-only.

Make provision for passing custom serialisers in woof producer #15

Merged
merged 2 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions woof/partitioned_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(self, broker,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, # unused - here for legacy support
retries=3,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
**kwargs):

try:
Expand All @@ -39,8 +41,8 @@ def __init__(self, broker,
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
self.prod = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
key_serializer=key_serializer,
value_serializer=value_serializer,
batch_size=batch_send_every_n,
retries=retries,
partitioner=_partitioner,
Expand Down Expand Up @@ -106,16 +108,22 @@ class CyclicPartitionedProducer(KafkaProducer):
use send() to send to any topic and distribute keys cyclically in partitions
"""

def __init__(self, broker, async=True, random_start=True, **kwargs):
def __init__(self,
broker,
async=True,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
random_start=True,
**kwargs):
self.partition_cycles = {}
self.random_start = random_start
self.async = async
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
super(CyclicPartitionedProducer, self).__init__(
bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
key_serializer=key_serializer,
value_serializer=value_serializer,
**kwargs)

def _partition(self, topic, partition, key, value, serialized_key,
Expand Down
12 changes: 9 additions & 3 deletions woof/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@ class FeedProducer(object):
use send() to send to any topic
"""

def __init__(self, broker, retries=3, async=False, **kwargs):
def __init__(self,
broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
retries=3,
async=False,
**kwargs):
try:
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
self.prod = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
key_serializer=key_serializer,
value_serializer=value_serializer,
retries=retries,
**kwargs)
self.async = async
Expand Down
28 changes: 17 additions & 11 deletions woof/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,28 @@
log = logging.getLogger("woof")


def make_kafka_safe(raw_data):
"""
This function was written to avoid non-unicode
string data produced to Kafka
"""
if type(raw_data) != unicode:
raw_data = str(raw_data)
raw_data = raw_data.decode('utf-8')
return raw_data.encode('ascii', 'ignore')
else:
return raw_data.encode('ascii', 'ignore')


class TransactionLogger(object):
def __init__(self,
broker,
vertical,
host=socket.gethostname(),
async=False,
retries=1,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
**kwargs):
self.broker = broker
self.this_host = host
Expand All @@ -27,8 +42,8 @@ def __init__(self,
# thread safe producer, uses default murmur2 partiioner by default
# good for us
self.producer = KafkaProducer(bootstrap_servers=broker,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
key_serializer=key_serializer,
value_serializer=value_serializer,
retries=retries,
**kwargs)

Expand Down Expand Up @@ -135,12 +150,3 @@ def _format_message(self, verb, txn_id, amount, skus, detail, userid,

def _get_topic_from_vertical(vertical):
return "_".join(["TRANSACTIONS", vertical])


def make_kafka_safe(raw_data):
if type(raw_data) != unicode:
raw_data = str(raw_data)
raw_data = raw_data.decode('utf-8')
return raw_data.encode('ascii', 'ignore')
else:
return raw_data.encode('ascii', 'ignore')