Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix freeze on reconnection attempts #54

Merged
merged 11 commits into from
Apr 23, 2021
93 changes: 64 additions & 29 deletions src/main/kotlin/io/kuzzle/sdk/protocol/WebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.net.ConnectException
import java.net.SocketException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a simple boolean not sufficient?

Copy link
Contributor Author

@Shiranuit Shiranuit Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we are doing the check in an other thread (CompletableFuture) which means we need an AtomicBoolean to avoid race conditions with the disconnect method that might be call from a different thread

import kotlin.concurrent.thread

open class WebSocket : AbstractProtocol {
Expand All @@ -33,7 +34,7 @@ open class WebSocket : AbstractProtocol {
private val autoReconnect: Boolean
private val reconnectionDelay: Long
private val reconnectionRetries: Long
private var retryCount: Long = 0
private val stopRetryingToConnect: AtomicBoolean = AtomicBoolean(false)

@KtorExperimentalAPI
protected open var client = HttpClient {
Expand Down Expand Up @@ -61,35 +62,47 @@ open class WebSocket : AbstractProtocol {
this.reconnectionRetries = reconnectionRetries
}

private fun tryToReconnect(): Boolean {
if (!autoReconnect)
return false
@KtorExperimentalAPI
private fun tryToReconnect(): CompletableFuture<Boolean> {
if (!autoReconnect || this.stopRetryingToConnect.get())
return CompletableFuture.completedFuture(false)

state = ProtocolState.RECONNECTING
trigger("networkStateChange", state.toString())
return CompletableFuture.supplyAsync(
fun(): Boolean {
var retryCount: Long = 0
while ((reconnectionRetries == -1L || retryCount < reconnectionRetries) && !this.stopRetryingToConnect.get()) {
// If not infinite, increment retryCount
if (reconnectionRetries != -1L)
retryCount++

while (retryCount < reconnectionRetries) {
retryCount++
Thread.sleep(reconnectionDelay)
try {
connect()
return true
} catch (e: Exception) {
// Nothing to do, just retry
Thread.sleep(reconnectionDelay)
try {
connect()
return true
} catch (e: Exception) {
// Nothing to do, just retry
}
}
this.stopRetryingToConnect.set(false)
return false
}
}
return false
)
}

@KtorExperimentalAPI
override fun connect() {
if (this.stopRetryingToConnect.get())
throw Exception("Connection Aborted")

val wait = CompletableFuture<Void>()
val block: suspend DefaultClientWebSocketSession.() -> Unit = {
ws = this
// @TODO Create enums for events
state = ProtocolState.OPEN
trigger("networkStateChange", ProtocolState.OPEN.toString())
retryCount = 0

thread(start = true) {
while (ws != null) {
val payload = queue.poll()
Expand All @@ -101,7 +114,7 @@ open class WebSocket : AbstractProtocol {
}
}
wait.complete(null)
var reconnected = false
var skip = false
try {
for (frame in incoming) {
when (frame) {
Expand All @@ -112,9 +125,20 @@ open class WebSocket : AbstractProtocol {
}
}
} catch (e: Exception) {
reconnected = tryToReconnect()
skip = true
tryToReconnect().thenApply(
fun (success: Boolean) {
if (!success) {
state = ProtocolState.CLOSE
trigger("networkStateChange", ProtocolState.CLOSE.toString())
ws = null
}
// reset stopRetryingToConnect
stopRetryingToConnect.set(false)
}
)
}
if (!reconnected) {
if (!skip) {
state = ProtocolState.CLOSE
trigger("networkStateChange", ProtocolState.CLOSE.toString())
ws = null
Expand All @@ -135,7 +159,10 @@ open class WebSocket : AbstractProtocol {
block = block
)
}
retryCount = 0

// On connection success
stopRetryingToConnect.set(false)

// This thread is here to let JAVA run until the socket is closed
// In Kotlin this is handled by the block function above but for some reason in JAVA it is
// non blocking.
Expand All @@ -148,11 +175,16 @@ open class WebSocket : AbstractProtocol {
is SocketException,
is IOException -> {
if (state != ProtocolState.RECONNECTING) {
if (!tryToReconnect()) {
wait.completeExceptionally(e)
} else {
wait.complete(null)
}
tryToReconnect().thenAcceptAsync(
fun (success: Boolean) {
if (success) {
wait.complete(null)
} else {
wait.completeExceptionally(e)
}
stopRetryingToConnect.set(false)
}
)
} else {
wait.completeExceptionally(e)
}
Expand All @@ -164,11 +196,14 @@ open class WebSocket : AbstractProtocol {
}

override fun disconnect() {
state = ProtocolState.CLOSE
trigger("networkStateChange", ProtocolState.CLOSE.toString())
GlobalScope.launch {
ws?.close()
ws = null
if (state != ProtocolState.CLOSE) {
state = ProtocolState.CLOSE
trigger("networkStateChange", ProtocolState.CLOSE.toString())
stopRetryingToConnect.set(true)
GlobalScope.launch {
ws?.close()
ws = null
}
}
}

Expand Down