Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release redis messages when it is reference counted #124

Merged
merged 2 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ internal class TransactionImpl(private val client: DefaultKredsClient) : Exclusi

when (val execResult = responseMessages.last()) {
is ErrorRedisMessage -> throw KredsRedisDataException(execResult.content())
is ArrayRedisMessage -> ArrayHandler.doHandle(execResult)
is ArrayRedisMessage -> ArrayHandler.doHandleAndRelease(execResult)
else -> throw KredsTransactionException("Invalid data received from Redis.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ internal open class CommandProcessor<R>(private vararg val outputTypeHandlers: M
override fun decode(message: RedisMessage): R {
if (message is ErrorRedisMessage) throw KredsRedisDataException(message.content())
val handler = outputTypeHandlers.first { it.canHandle(message) }
return handler.doHandle(message) as R
return handler.doHandleAndRelease(message) as R
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@ package io.github.crackthecodeabhi.kreds.protocol
import io.github.crackthecodeabhi.kreds.toDefaultCharset
import io.netty.buffer.Unpooled
import io.netty.handler.codec.redis.*
import io.netty.util.ReferenceCounted

internal interface MessageHandler<T> {
fun canHandle(message: RedisMessage): Boolean
fun doHandle(message: RedisMessage): T
/**
* Handles the message and releases it
*/
fun doHandleAndRelease(message: RedisMessage): T
}
internal abstract class AbstractMessageHandler<T>: MessageHandler<T> {
abstract fun doHandle(message: RedisMessage): T
override fun doHandleAndRelease(message: RedisMessage): T = try {
doHandle(message)
} finally {
if (message is ReferenceCounted) {
message.release()
}
}
}

internal object SimpleStringHandler : MessageHandler<String> {
internal object SimpleStringHandler : AbstractMessageHandler<String>() {
override fun canHandle(message: RedisMessage): Boolean = message is SimpleStringRedisMessage

override fun doHandle(message: RedisMessage): String {
Expand All @@ -37,7 +51,7 @@ internal object SimpleStringHandler : MessageHandler<String> {
}
}

internal object IntegerHandler : MessageHandler<Long> {
internal object IntegerHandler : AbstractMessageHandler<Long>() {
override fun canHandle(message: RedisMessage): Boolean = message is IntegerRedisMessage

override fun doHandle(message: RedisMessage): Long {
Expand All @@ -46,7 +60,7 @@ internal object IntegerHandler : MessageHandler<Long> {
}
}

internal object BulkStringHandler : MessageHandler<String?> {
internal object BulkStringHandler : AbstractMessageHandler<String?>() {
override fun canHandle(message: RedisMessage): Boolean = message is FullBulkStringRedisMessage

override fun doHandle(message: RedisMessage): String? {
Expand All @@ -57,7 +71,7 @@ internal object BulkStringHandler : MessageHandler<String?> {
}
}

internal object ArrayHandler : MessageHandler<List<*>?> {
internal object ArrayHandler : AbstractMessageHandler<List<*>?>() {
override fun canHandle(message: RedisMessage): Boolean = message is ArrayRedisMessage

override fun doHandle(message: RedisMessage): List<*>? {
Expand Down
47 changes: 47 additions & 0 deletions src/test/kotlin/io/github/crackthecodeabhi/kreds/MemoryLeak.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023 Abhijith Shivaswamy
* See the notice.md file distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.github.crackthecodeabhi.kreds

import io.github.crackthecodeabhi.kreds.commands.ClearDB
import io.github.crackthecodeabhi.kreds.commands.ClientSetup
import io.github.crackthecodeabhi.kreds.commands.ClientTearDown
import io.github.crackthecodeabhi.kreds.connection.KredsClient
import io.kotest.core.spec.style.FunSpec
import io.netty.util.ResourceLeakDetector

class MemoryLeak: FunSpec({
lateinit var client: KredsClient
val clientSetup = ClientSetup().then { client = it.client }
beforeSpec(clientSetup)
afterSpec(ClientTearDown(clientSetup))
beforeTest(ClearDB(clientSetup))
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID)

// This test should pass, but observe the logs for memory leak errors by netty
test("Memory leak test") {
val key = "key"
val value = "value"
for (i in 0..100) {
client.set(key, value)
client.get(key)
System.gc()
}
}
})