Skip to content

Commit

Permalink
Prevent socket response continuation being called multiple times (#3338
Browse files Browse the repository at this point in the history
…) (#4616)

* Prevent socket response continuation being called multiple times (3338)

Coroutines continuations can only be invoke once, see CancellableContinuation. Use atomic boolean to ensure the continuation is called only once in a thread safe way.

* Update import ordering

* Add warning log message to coroutines continuation

Log a warning message so that it's more visible if a websocket response is invoked multiple times

* Set initial continuation invocation to false

The default value must be false as the continuation property is mutable. The value is always assigned after the object is created.
  • Loading branch information
ShaunPlummer committed Sep 12, 2024
1 parent 8270b18 commit 4838dc7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.homeassistant.companion.android.common.data.websocket

import io.homeassistant.companion.android.common.data.websocket.impl.entities.SocketResponse
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.SharedFlow
Expand All @@ -24,4 +25,6 @@ data class WebSocketRequest(
val eventTimeout: Long = 0L,
val onEvent: Channel<Any>? = null,
var onResponse: CancellableContinuation<SocketResponse>? = null
)
) {
val hasContinuationBeenInvoked = AtomicBoolean(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,11 @@ class WebSocketRepositoryImpl @AssistedInject constructor(
val id = response.id!!
activeMessages[id]?.let {
it.onResponse?.let { cont ->
if (cont.isActive) cont.resumeWith(Result.success(response))
if (!it.hasContinuationBeenInvoked.getAndSet(true) && cont.isActive) {
cont.resumeWith(Result.success(response))
} else {
Log.w(TAG, "Response continuation has already been invoked for ${response.id}, ${response.event}")
}
}
if (it.eventFlow == null) {
activeMessages.remove(id)
Expand Down Expand Up @@ -818,7 +822,11 @@ class WebSocketRepositoryImpl @AssistedInject constructor(
.filterValues { it.eventFlow == null }
.forEach {
it.value.onResponse?.let { cont ->
if (cont.isActive) cont.resumeWithException(IOException())
if (!it.value.hasContinuationBeenInvoked.getAndSet(true) && cont.isActive) {
cont.resumeWithException(IOException())
} else {
Log.w(TAG, "Response continuation has already been invoked, skipping IOException")
}
}
activeMessages.remove(it.key)
}
Expand Down

0 comments on commit 4838dc7

Please sign in to comment.