Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update built-in partitioner options to include murmur2 #396

Merged
merged 3 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 7 additions & 92 deletions confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
struct Producer_msgstate {
Handle *self;
PyObject *dr_cb;
PyObject *partitioner_cb;
};


Expand All @@ -58,32 +57,23 @@ struct Producer_msgstate {
*/
static __inline struct Producer_msgstate *
Producer_msgstate_new (Handle *self,
PyObject *dr_cb, PyObject *partitioner_cb) {
PyObject *dr_cb) {
struct Producer_msgstate *msgstate;

if (!dr_cb && !partitioner_cb)
return NULL;

msgstate = calloc(1, sizeof(*msgstate));
msgstate->self = self;

if (dr_cb) {
msgstate->dr_cb = dr_cb;
Py_INCREF(dr_cb);
}
if (partitioner_cb) {
msgstate->partitioner_cb = partitioner_cb;
Py_INCREF(partitioner_cb);
}
return msgstate;
}

static __inline void
Producer_msgstate_destroy (struct Producer_msgstate *msgstate) {
if (msgstate->dr_cb)
Py_DECREF(msgstate->dr_cb);
if (msgstate->partitioner_cb)
Py_DECREF(msgstate->partitioner_cb);
free(msgstate);
}

Expand All @@ -93,10 +83,6 @@ static void Producer_clear0 (Handle *self) {
Py_DECREF(self->u.Producer.default_dr_cb);
self->u.Producer.default_dr_cb = NULL;
}
if (self->u.Producer.partitioner_cb) {
Py_DECREF(self->u.Producer.partitioner_cb);
self->u.Producer.partitioner_cb = NULL;
}
}

static int Producer_clear (Handle *self) {
Expand Down Expand Up @@ -128,8 +114,6 @@ static int Producer_traverse (Handle *self,
visitproc visit, void *arg) {
if (self->u.Producer.default_dr_cb)
Py_VISIT(self->u.Producer.default_dr_cb);
if (self->u.Producer.partitioner_cb)
Py_VISIT(self->u.Producer.partitioner_cb);

Handle_traverse(self, visit, arg);

Expand Down Expand Up @@ -191,71 +175,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
}


/**
* FIXME: The partitioner is currently broken due to threading/GIL issues.
*/
int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque, void *msg_opaque) {
Handle *self = rkt_opaque;
struct Producer_msgstate *msgstate = msg_opaque;
PyGILState_STATE gstate;
PyObject *result;
PyObject *args;
int32_t r = RD_KAFKA_PARTITION_UA;

if (!msgstate) {
/* Fall back on default C partitioner if neither a per-msg
* partitioner nor a default Python partitioner is available */
return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen,
partition_cnt,
rkt_opaque, msg_opaque);
}

gstate = PyGILState_Ensure();

if (!msgstate->partitioner_cb) {
/* Fall back on default C partitioner if neither a per-msg
* partitioner nor a default Python partitioner is available */
r = msgstate->self->u.Producer.c_partitioner_cb(rkt,
keydata, keylen,
partition_cnt,
rkt_opaque,
msg_opaque);
goto done;
}

args = Py_BuildValue("(s#l)",
(const char *)keydata, (int)keylen,
(long)partition_cnt);
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
goto done;
}


result = PyObject_CallObject(msgstate->partitioner_cb, args);
Py_DECREF(args);

if (result) {
r = (int32_t)cfl_PyInt_AsInt(result);
if (PyErr_Occurred())
printf("FIXME: partition_cb returned wrong type "
"(expected long), how to propagate?\n");
Py_DECREF(result);
} else {
printf("FIXME: partitioner_cb crashed, how to propagate?\n");
}

done:
PyGILState_Release(gstate);
return r;
}


#if HAVE_PRODUCEV
static rd_kafka_resp_err_t
Producer_producev (Handle *self,
Expand Down Expand Up @@ -313,7 +232,7 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
const char *topic, *value = NULL, *key = NULL;
int value_len = 0, key_len = 0;
int partition = RD_KAFKA_PARTITION_UA;
PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL;
PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL;
long long timestamp = 0;
rd_kafka_resp_err_t err;
struct Producer_msgstate *msgstate;
Expand All @@ -327,17 +246,16 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
"partition",
"callback",
"on_delivery", /* Alias */
"partitioner",
"timestamp",
"headers",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"s|z#z#iOOOLO"
"s|z#z#iOOLO"
, kws,
&topic, &value, &value_len,
&key, &key_len, &partition,
&dr_cb, &dr_cb2, &partitioner_cb,
&dr_cb, &dr_cb2,
&timestamp, &headers))
return NULL;

Expand Down Expand Up @@ -376,13 +294,10 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,

if (!dr_cb || dr_cb == Py_None)
dr_cb = self->u.Producer.default_dr_cb;
if (!partitioner_cb || partitioner_cb == Py_None)
partitioner_cb = self->u.Producer.partitioner_cb;


/* Create msgstate if necessary, may return NULL if no callbacks
* are wanted. */
msgstate = Producer_msgstate_new(self, dr_cb, partitioner_cb);
msgstate = Producer_msgstate_new(self, dr_cb);

/* Produce message */
#if HAVE_PRODUCEV
Expand Down Expand Up @@ -503,8 +418,8 @@ static PyMethodDef Producer_methods[] = {
" :param str topic: Topic to produce message to\n"
" :param str|bytes value: Message payload\n"
" :param str|bytes key: Message key\n"
" :param int partition: Partition to produce to, elses uses the "
"configured partitioner.\n"
" :param int partition: Partition to produce to, else uses the "
"configured built-in partitioner.\n"
" :param func on_delivery(err,msg): Delivery report callback to call "
"(from :py:func:`poll()` or :py:func:`flush()`) on successful or "
"failed delivery\n"
Expand Down
67 changes: 0 additions & 67 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1451,70 +1451,6 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,

return 1;

} else if (!strcasecmp(name, "partitioner") ||
!strcasecmp(name, "partitioner_callback")) {

if ((vs = cfl_PyObject_Unistr(valobj))) {
/* Use built-in C partitioners,
* based on their name. */
PyObject *vs8;
val = cfl_PyUnistr_AsUTF8(vs, &vs8);

if (!strcmp(val, "random"))
rd_kafka_topic_conf_set_partitioner_cb(
tconf, rd_kafka_msg_partitioner_random);
else if (!strcmp(val, "consistent"))
rd_kafka_topic_conf_set_partitioner_cb(
tconf, rd_kafka_msg_partitioner_consistent);
else if (!strcmp(val, "consistent_random"))
rd_kafka_topic_conf_set_partitioner_cb(
tconf, rd_kafka_msg_partitioner_consistent_random);
else {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"unknown builtin partitioner: %s "
"(available: random, consistent, consistent_random)",
val);
Py_XDECREF(vs8);
Py_DECREF(vs);
return -1;
}

Py_XDECREF(vs8);
Py_DECREF(vs);

} else {
/* Custom partitioner (Python callback) */

if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s requires a callable "
"object", name);
return -1;
}

/* FIXME: Error out until GIL+rdkafka lock-ordering is fixed. */
if (1) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
"custom partitioner support not yet implemented");
return -1;
}

if (self->u.Producer.partitioner_cb)
Py_DECREF(self->u.Producer.partitioner_cb);

self->u.Producer.partitioner_cb = valobj;
Py_INCREF(self->u.Producer.partitioner_cb);

/* Use trampoline to call Python code. */
rd_kafka_topic_conf_set_partitioner_cb(tconf,
Producer_partitioner_cb);
}

return 1;

} else if (!strcmp(name, "delivery.report.only.error")) {
/* Since we allocate msgstate for each produced message
* with a callback we can't use delivery.report.only.error
Expand Down Expand Up @@ -1577,9 +1513,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_ssize_t pos = 0;
PyObject *ko, *vo;
PyObject *confdict = NULL;
int32_t (*partitioner_cb) (const rd_kafka_topic_t *,
const void *, size_t, int32_t,
void *, void *) = partitioner_cb;

if (rd_kafka_version() < MIN_RD_KAFKA_VERSION) {
PyErr_Format(PyExc_RuntimeError,
Expand Down
13 changes: 0 additions & 13 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@ typedef struct {
*/
struct {
PyObject *default_dr_cb;
PyObject *partitioner_cb; /**< Registered Python partitioner */
int32_t (*c_partitioner_cb) (
const rd_kafka_topic_t *,
const void *, size_t, int32_t,
void *, void *); /**< Fallback C partitioner*/

int dr_only_error; /**< delivery.report.only.error */
} Producer;

Expand Down Expand Up @@ -396,13 +390,6 @@ PyObject *Message_error (Message *self, PyObject *ignore);
extern PyTypeObject ProducerType;


int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque, void *msg_opaque);


/****************************************************************************
*
*
Expand Down
23 changes: 23 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,26 @@ def handle_dr(err, msg):
p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr)

p.flush()


def test_set_partitioner_murmur2():
"""
Test ability to set built-in partitioner type murmur
"""
Producer({'partitioner': 'murmur2'})


def test_set_partitioner_murmur2_random():
"""
Test ability to set built-in partitioner type murmur2_random
"""
Producer({'partitioner': 'murmur2_random'})


def test_set_invalid_partitioner_murmur():
"""
Assert invalid partitioner raises KafkaException
"""
with pytest.raises(KafkaException) as e:
Producer({'partitioner': 'murmur'})
assert 'Invalid value for configuration property "partitioner": murmur' in str(e)