diff --git a/src/main/kotlin/io/github/crackthecodeabhi/kreds/pipeline/Transaction.kt b/src/main/kotlin/io/github/crackthecodeabhi/kreds/pipeline/Transaction.kt index 02884ad..e312c34 100644 --- a/src/main/kotlin/io/github/crackthecodeabhi/kreds/pipeline/Transaction.kt +++ b/src/main/kotlin/io/github/crackthecodeabhi/kreds/pipeline/Transaction.kt @@ -200,7 +200,7 @@ internal class TransactionImpl(private val client: DefaultKredsClient) : Exclusi when (val execResult = responseMessages.last()) { is ErrorRedisMessage -> throw KredsRedisDataException(execResult.content()) - is ArrayRedisMessage -> ArrayHandler.doHandle(execResult) + is ArrayRedisMessage -> ArrayHandler.doHandleAndRelease(execResult) else -> throw KredsTransactionException("Invalid data received from Redis.") } } diff --git a/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/Command.kt b/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/Command.kt index 8c139cc..0aff435 100644 --- a/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/Command.kt +++ b/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/Command.kt @@ -69,6 +69,6 @@ internal open class CommandProcessor(private vararg val outputTypeHandlers: M override fun decode(message: RedisMessage): R { if (message is ErrorRedisMessage) throw KredsRedisDataException(message.content()) val handler = outputTypeHandlers.first { it.canHandle(message) } - return handler.doHandle(message) as R + return handler.doHandleAndRelease(message) as R } } \ No newline at end of file diff --git a/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/MessageHandlers.kt b/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/MessageHandlers.kt index 7276944..b34aa9c 100644 --- a/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/MessageHandlers.kt +++ b/src/main/kotlin/io/github/crackthecodeabhi/kreds/protocol/MessageHandlers.kt @@ -22,13 +22,27 @@ package io.github.crackthecodeabhi.kreds.protocol import io.github.crackthecodeabhi.kreds.toDefaultCharset import io.netty.buffer.Unpooled import io.netty.handler.codec.redis.* +import io.netty.util.ReferenceCounted internal interface MessageHandler { fun canHandle(message: RedisMessage): Boolean - fun doHandle(message: RedisMessage): T + /** + * Handles the message and releases it + */ + fun doHandleAndRelease(message: RedisMessage): T +} +internal abstract class AbstractMessageHandler: MessageHandler { + abstract fun doHandle(message: RedisMessage): T + override fun doHandleAndRelease(message: RedisMessage): T = try { + doHandle(message) + } finally { + if (message is ReferenceCounted) { + message.release() + } + } } -internal object SimpleStringHandler : MessageHandler { +internal object SimpleStringHandler : AbstractMessageHandler() { override fun canHandle(message: RedisMessage): Boolean = message is SimpleStringRedisMessage override fun doHandle(message: RedisMessage): String { @@ -37,7 +51,7 @@ internal object SimpleStringHandler : MessageHandler { } } -internal object IntegerHandler : MessageHandler { +internal object IntegerHandler : AbstractMessageHandler() { override fun canHandle(message: RedisMessage): Boolean = message is IntegerRedisMessage override fun doHandle(message: RedisMessage): Long { @@ -46,7 +60,7 @@ internal object IntegerHandler : MessageHandler { } } -internal object BulkStringHandler : MessageHandler { +internal object BulkStringHandler : AbstractMessageHandler() { override fun canHandle(message: RedisMessage): Boolean = message is FullBulkStringRedisMessage override fun doHandle(message: RedisMessage): String? { @@ -57,7 +71,7 @@ internal object BulkStringHandler : MessageHandler { } } -internal object ArrayHandler : MessageHandler?> { +internal object ArrayHandler : AbstractMessageHandler?>() { override fun canHandle(message: RedisMessage): Boolean = message is ArrayRedisMessage override fun doHandle(message: RedisMessage): List<*>? { diff --git a/src/test/kotlin/io/github/crackthecodeabhi/kreds/MemoryLeak.kt b/src/test/kotlin/io/github/crackthecodeabhi/kreds/MemoryLeak.kt new file mode 100644 index 0000000..8e87d03 --- /dev/null +++ b/src/test/kotlin/io/github/crackthecodeabhi/kreds/MemoryLeak.kt @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2023 Abhijith Shivaswamy + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.github.crackthecodeabhi.kreds + +import io.github.crackthecodeabhi.kreds.commands.ClearDB +import io.github.crackthecodeabhi.kreds.commands.ClientSetup +import io.github.crackthecodeabhi.kreds.commands.ClientTearDown +import io.github.crackthecodeabhi.kreds.connection.KredsClient +import io.kotest.core.spec.style.FunSpec +import io.netty.util.ResourceLeakDetector + +class MemoryLeak: FunSpec({ + lateinit var client: KredsClient + val clientSetup = ClientSetup().then { client = it.client } + beforeSpec(clientSetup) + afterSpec(ClientTearDown(clientSetup)) + beforeTest(ClearDB(clientSetup)) + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID) + + // This test should pass, but observe the logs for memory leak errors by netty + test("Memory leak test") { + val key = "key" + val value = "value" + for (i in 0..100) { + client.set(key, value) + client.get(key) + System.gc() + } + } +}) \ No newline at end of file