diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c index 07c2ba30c..9596bac09 100644 --- a/confluent_kafka/src/Producer.c +++ b/confluent_kafka/src/Producer.c @@ -48,7 +48,6 @@ struct Producer_msgstate { Handle *self; PyObject *dr_cb; - PyObject *partitioner_cb; }; @@ -58,12 +57,9 @@ struct Producer_msgstate { */ static __inline struct Producer_msgstate * Producer_msgstate_new (Handle *self, - PyObject *dr_cb, PyObject *partitioner_cb) { + PyObject *dr_cb) { struct Producer_msgstate *msgstate; - if (!dr_cb && !partitioner_cb) - return NULL; - msgstate = calloc(1, sizeof(*msgstate)); msgstate->self = self; @@ -71,10 +67,6 @@ Producer_msgstate_new (Handle *self, msgstate->dr_cb = dr_cb; Py_INCREF(dr_cb); } - if (partitioner_cb) { - msgstate->partitioner_cb = partitioner_cb; - Py_INCREF(partitioner_cb); - } return msgstate; } @@ -82,8 +74,6 @@ static __inline void Producer_msgstate_destroy (struct Producer_msgstate *msgstate) { if (msgstate->dr_cb) Py_DECREF(msgstate->dr_cb); - if (msgstate->partitioner_cb) - Py_DECREF(msgstate->partitioner_cb); free(msgstate); } @@ -93,10 +83,6 @@ static void Producer_clear0 (Handle *self) { Py_DECREF(self->u.Producer.default_dr_cb); self->u.Producer.default_dr_cb = NULL; } - if (self->u.Producer.partitioner_cb) { - Py_DECREF(self->u.Producer.partitioner_cb); - self->u.Producer.partitioner_cb = NULL; - } } static int Producer_clear (Handle *self) { @@ -128,8 +114,6 @@ static int Producer_traverse (Handle *self, visitproc visit, void *arg) { if (self->u.Producer.default_dr_cb) Py_VISIT(self->u.Producer.default_dr_cb); - if (self->u.Producer.partitioner_cb) - Py_VISIT(self->u.Producer.partitioner_cb); Handle_traverse(self, visit, arg); @@ -191,71 +175,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, } -/** - * FIXME: The partitioner is currently broken due to threading/GIL issues. - */ -int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, - const void *keydata, - size_t keylen, - int32_t partition_cnt, - void *rkt_opaque, void *msg_opaque) { - Handle *self = rkt_opaque; - struct Producer_msgstate *msgstate = msg_opaque; - PyGILState_STATE gstate; - PyObject *result; - PyObject *args; - int32_t r = RD_KAFKA_PARTITION_UA; - - if (!msgstate) { - /* Fall back on default C partitioner if neither a per-msg - * partitioner nor a default Python partitioner is available */ - return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen, - partition_cnt, - rkt_opaque, msg_opaque); - } - - gstate = PyGILState_Ensure(); - - if (!msgstate->partitioner_cb) { - /* Fall back on default C partitioner if neither a per-msg - * partitioner nor a default Python partitioner is available */ - r = msgstate->self->u.Producer.c_partitioner_cb(rkt, - keydata, keylen, - partition_cnt, - rkt_opaque, - msg_opaque); - goto done; - } - - args = Py_BuildValue("(s#l)", - (const char *)keydata, (int)keylen, - (long)partition_cnt); - if (!args) { - cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, - "Unable to build callback args"); - goto done; - } - - - result = PyObject_CallObject(msgstate->partitioner_cb, args); - Py_DECREF(args); - - if (result) { - r = (int32_t)cfl_PyInt_AsInt(result); - if (PyErr_Occurred()) - printf("FIXME: partition_cb returned wrong type " - "(expected long), how to propagate?\n"); - Py_DECREF(result); - } else { - printf("FIXME: partitioner_cb crashed, how to propagate?\n"); - } - - done: - PyGILState_Release(gstate); - return r; -} - - #if HAVE_PRODUCEV static rd_kafka_resp_err_t Producer_producev (Handle *self, @@ -313,7 +232,7 @@ static PyObject *Producer_produce (Handle *self, PyObject *args, const char *topic, *value = NULL, *key = NULL; int value_len = 0, key_len = 0; int partition = RD_KAFKA_PARTITION_UA; - PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL; + PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL; long long timestamp = 0; rd_kafka_resp_err_t err; struct Producer_msgstate *msgstate; @@ -327,17 +246,16 @@ static PyObject *Producer_produce (Handle *self, PyObject *args, "partition", "callback", "on_delivery", /* Alias */ - "partitioner", "timestamp", "headers", NULL }; if (!PyArg_ParseTupleAndKeywords(args, kwargs, - "s|z#z#iOOOLO" + "s|z#z#iOOLO" , kws, &topic, &value, &value_len, &key, &key_len, &partition, - &dr_cb, &dr_cb2, &partitioner_cb, + &dr_cb, &dr_cb2, ×tamp, &headers)) return NULL; @@ -376,13 +294,10 @@ static PyObject *Producer_produce (Handle *self, PyObject *args, if (!dr_cb || dr_cb == Py_None) dr_cb = self->u.Producer.default_dr_cb; - if (!partitioner_cb || partitioner_cb == Py_None) - partitioner_cb = self->u.Producer.partitioner_cb; - /* Create msgstate if necessary, may return NULL if no callbacks * are wanted. */ - msgstate = Producer_msgstate_new(self, dr_cb, partitioner_cb); + msgstate = Producer_msgstate_new(self, dr_cb); /* Produce message */ #if HAVE_PRODUCEV @@ -503,8 +418,8 @@ static PyMethodDef Producer_methods[] = { " :param str topic: Topic to produce message to\n" " :param str|bytes value: Message payload\n" " :param str|bytes key: Message key\n" - " :param int partition: Partition to produce to, elses uses the " - "configured partitioner.\n" + " :param int partition: Partition to produce to, else uses the " + "configured built-in partitioner.\n" " :param func on_delivery(err,msg): Delivery report callback to call " "(from :py:func:`poll()` or :py:func:`flush()`) on successful or " "failed delivery\n" diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index af06fdf27..0ad04398d 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -1451,70 +1451,6 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf, return 1; - } else if (!strcasecmp(name, "partitioner") || - !strcasecmp(name, "partitioner_callback")) { - - if ((vs = cfl_PyObject_Unistr(valobj))) { - /* Use built-in C partitioners, - * based on their name. */ - PyObject *vs8; - val = cfl_PyUnistr_AsUTF8(vs, &vs8); - - if (!strcmp(val, "random")) - rd_kafka_topic_conf_set_partitioner_cb( - tconf, rd_kafka_msg_partitioner_random); - else if (!strcmp(val, "consistent")) - rd_kafka_topic_conf_set_partitioner_cb( - tconf, rd_kafka_msg_partitioner_consistent); - else if (!strcmp(val, "consistent_random")) - rd_kafka_topic_conf_set_partitioner_cb( - tconf, rd_kafka_msg_partitioner_consistent_random); - else { - cfl_PyErr_Format( - RD_KAFKA_RESP_ERR__INVALID_ARG, - "unknown builtin partitioner: %s " - "(available: random, consistent, consistent_random)", - val); - Py_XDECREF(vs8); - Py_DECREF(vs); - return -1; - } - - Py_XDECREF(vs8); - Py_DECREF(vs); - - } else { - /* Custom partitioner (Python callback) */ - - if (!PyCallable_Check(valobj)) { - cfl_PyErr_Format( - RD_KAFKA_RESP_ERR__INVALID_ARG, - "%s requires a callable " - "object", name); - return -1; - } - - /* FIXME: Error out until GIL+rdkafka lock-ordering is fixed. */ - if (1) { - cfl_PyErr_Format( - RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, - "custom partitioner support not yet implemented"); - return -1; - } - - if (self->u.Producer.partitioner_cb) - Py_DECREF(self->u.Producer.partitioner_cb); - - self->u.Producer.partitioner_cb = valobj; - Py_INCREF(self->u.Producer.partitioner_cb); - - /* Use trampoline to call Python code. */ - rd_kafka_topic_conf_set_partitioner_cb(tconf, - Producer_partitioner_cb); - } - - return 1; - } else if (!strcmp(name, "delivery.report.only.error")) { /* Since we allocate msgstate for each produced message * with a callback we can't use delivery.report.only.error @@ -1577,9 +1513,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_ssize_t pos = 0; PyObject *ko, *vo; PyObject *confdict = NULL; - int32_t (*partitioner_cb) (const rd_kafka_topic_t *, - const void *, size_t, int32_t, - void *, void *) = partitioner_cb; if (rd_kafka_version() < MIN_RD_KAFKA_VERSION) { PyErr_Format(PyExc_RuntimeError, diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index f8333c32d..1259fb92e 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -201,12 +201,6 @@ typedef struct { */ struct { PyObject *default_dr_cb; - PyObject *partitioner_cb; /**< Registered Python partitioner */ - int32_t (*c_partitioner_cb) ( - const rd_kafka_topic_t *, - const void *, size_t, int32_t, - void *, void *); /**< Fallback C partitioner*/ - int dr_only_error; /**< delivery.report.only.error */ } Producer; @@ -396,13 +390,6 @@ PyObject *Message_error (Message *self, PyObject *ignore); extern PyTypeObject ProducerType; -int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, - const void *keydata, - size_t keylen, - int32_t partition_cnt, - void *rkt_opaque, void *msg_opaque); - - /**************************************************************************** * * diff --git a/tests/test_Producer.py b/tests/test_Producer.py index e736b8631..ac009ea58 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -168,3 +168,26 @@ def handle_dr(err, msg): p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr) p.flush() + + +def test_set_partitioner_murmur2(): + """ + Test ability to set built-in partitioner type murmur + """ + Producer({'partitioner': 'murmur2'}) + + +def test_set_partitioner_murmur2_random(): + """ + Test ability to set built-in partitioner type murmur2_random + """ + Producer({'partitioner': 'murmur2_random'}) + + +def test_set_invalid_partitioner_murmur(): + """ + Assert invalid partitioner raises KafkaException + """ + with pytest.raises(KafkaException) as e: + Producer({'partitioner': 'murmur'}) + assert 'Invalid value for configuration property "partitioner": murmur' in str(e)