mirror of
https://github.com/anyproto/anytype-kotlin.git
synced 2025-06-08 05:47:05 +09:00
DROID-3634 Notifications | Handle pushKeyUpdates and store (#2379)
This commit is contained in:
parent
c0ba062b81
commit
1ef93d72db
8 changed files with 619 additions and 0 deletions
|
@ -0,0 +1,58 @@
|
|||
package com.anytypeio.anytype.middleware.interactor.events
|
||||
|
||||
import com.anytypeio.anytype.core_models.chats.PushKeyUpdate
|
||||
import com.anytypeio.anytype.core_utils.ext.cancel
|
||||
import com.anytypeio.anytype.data.auth.event.PushKeyRemoteChannel
|
||||
import com.anytypeio.anytype.middleware.interactor.EventHandlerChannel
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.launch
|
||||
import timber.log.Timber
|
||||
|
||||
class PushKeyMiddlewareChannel(
|
||||
private val scope: CoroutineScope,
|
||||
private val channel: EventHandlerChannel,
|
||||
private val dispatcher: CoroutineDispatcher
|
||||
) : PushKeyRemoteChannel {
|
||||
|
||||
private val jobs = mutableListOf<Job>()
|
||||
|
||||
private val _pushKeyStatus = MutableStateFlow<PushKeyUpdate>(PushKeyUpdate.EMPTY)
|
||||
val pushKeyStatus: Flow<PushKeyUpdate> = _pushKeyStatus.asStateFlow()
|
||||
|
||||
override fun start() {
|
||||
Timber.i("PushKeyMiddlewareChannel start")
|
||||
jobs.cancel()
|
||||
jobs += scope.launch(dispatcher) {
|
||||
channel.flow()
|
||||
.catch {
|
||||
Timber.w(it, "Error collecting push key updates")
|
||||
}
|
||||
.collect { emission ->
|
||||
emission.messages.forEach { message ->
|
||||
message.pushEncryptionKeyUpdate?.let {
|
||||
val pushKeyUpdate = PushKeyUpdate(
|
||||
encryptionKeyId = it.encryptionKeyId,
|
||||
encryptionKey = it.encryptionKey
|
||||
)
|
||||
_pushKeyStatus.value = pushKeyUpdate
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
Timber.i("PushKeyMiddlewareChannel stop")
|
||||
jobs.cancel()
|
||||
}
|
||||
|
||||
override fun observe(): Flow<PushKeyUpdate> {
|
||||
return pushKeyStatus
|
||||
}
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
package com.anytypeio.anytype.middleware.interactor
|
||||
|
||||
import app.cash.turbine.test
|
||||
import app.cash.turbine.turbineScope
|
||||
import com.anytypeio.anytype.core_models.chats.PushKeyUpdate
|
||||
import com.anytypeio.anytype.middleware.interactor.events.PushKeyMiddlewareChannel
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.test.StandardTestDispatcher
|
||||
import kotlinx.coroutines.test.TestScope
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import net.bytebuddy.utility.RandomString
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
class PushKeyMiddlewareChannelTest {
|
||||
|
||||
private val dispatcher = StandardTestDispatcher(name = "Default test dispatcher")
|
||||
private val testScope = TestScope(dispatcher)
|
||||
lateinit var eventHandlerChannel: EventHandlerChannel
|
||||
private lateinit var channel: PushKeyMiddlewareChannel
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
eventHandlerChannel = EventHandlerChannelImpl()
|
||||
channel = PushKeyMiddlewareChannel(
|
||||
scope = testScope,
|
||||
channel = eventHandlerChannel,
|
||||
dispatcher = dispatcher
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should emit empty initially`() = runTest {
|
||||
// Given
|
||||
val initialValue = channel.observe().first()
|
||||
|
||||
// Then
|
||||
assertEquals(PushKeyUpdate.EMPTY, initialValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should emit push key update when receiving two valid events`() = runTest(dispatcher) {
|
||||
|
||||
turbineScope {
|
||||
// Given
|
||||
val expectedUpdate1 = PushKeyUpdate(
|
||||
encryptionKeyId = RandomString.make(),
|
||||
encryptionKey = RandomString.make(),
|
||||
)
|
||||
|
||||
val event1 = anytype.Event(
|
||||
messages = listOf(
|
||||
anytype.Event.Message(
|
||||
pushEncryptionKeyUpdate = anytype.Event.PushEncryptionKey.Update(
|
||||
encryptionKeyId = expectedUpdate1.encryptionKeyId,
|
||||
encryptionKey = expectedUpdate1.encryptionKey
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val expectedUpdate2 = PushKeyUpdate(
|
||||
encryptionKeyId = RandomString.make(),
|
||||
encryptionKey = RandomString.make()
|
||||
)
|
||||
|
||||
val event2 = anytype.Event(
|
||||
messages = listOf(
|
||||
anytype.Event.Message(
|
||||
pushEncryptionKeyUpdate = anytype.Event.PushEncryptionKey.Update(
|
||||
encryptionKeyId = expectedUpdate2.encryptionKeyId,
|
||||
encryptionKey = expectedUpdate2.encryptionKey
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
// When
|
||||
channel.start()
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
// Then
|
||||
channel.observe().test {
|
||||
|
||||
val emittedValue = awaitItem()
|
||||
assertEquals(PushKeyUpdate.EMPTY, emittedValue)
|
||||
|
||||
eventHandlerChannel.emit(event1)
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
val update1 = awaitItem()
|
||||
assertEquals(expectedUpdate1, update1)
|
||||
|
||||
eventHandlerChannel.emit(event2)
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
val update2 = awaitItem()
|
||||
assertEquals(expectedUpdate2, update2)
|
||||
|
||||
ensureAllEventsConsumed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should not emit update when receiving invalid event`() = runTest(dispatcher) {
|
||||
turbineScope {
|
||||
// Given
|
||||
|
||||
val event = anytype.Event(
|
||||
messages = listOf(
|
||||
anytype.Event.Message(
|
||||
p2pStatusUpdate = anytype.Event.P2PStatus.Update(
|
||||
spaceId = RandomString.make()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
// When
|
||||
channel.start()
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
// Then
|
||||
channel.observe().test {
|
||||
|
||||
val emittedValue = awaitItem()
|
||||
assertEquals(PushKeyUpdate.EMPTY, emittedValue)
|
||||
|
||||
eventHandlerChannel.emit(event)
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
ensureAllEventsConsumed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should stop processing events after stop is called`() = runTest(dispatcher) {
|
||||
|
||||
turbineScope {
|
||||
// Given
|
||||
|
||||
val expectedUpdate = PushKeyUpdate(
|
||||
encryptionKeyId = RandomString.make(),
|
||||
encryptionKey = RandomString.make(),
|
||||
)
|
||||
|
||||
val event = anytype.Event(
|
||||
messages = listOf(
|
||||
anytype.Event.Message(
|
||||
pushEncryptionKeyUpdate = anytype.Event.PushEncryptionKey.Update(
|
||||
encryptionKeyId = expectedUpdate.encryptionKeyId,
|
||||
encryptionKey = expectedUpdate.encryptionKey
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
// When
|
||||
channel.start()
|
||||
channel.stop()
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
// Then
|
||||
channel.observe().test {
|
||||
|
||||
val emittedValue = awaitItem()
|
||||
assertEquals(PushKeyUpdate.EMPTY, emittedValue)
|
||||
|
||||
eventHandlerChannel.emit(event)
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
ensureAllEventsConsumed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should handle multiple messages in single emission`() = runTest {
|
||||
|
||||
turbineScope {
|
||||
turbineScope {
|
||||
// Given
|
||||
val expectedUpdate1 = PushKeyUpdate(
|
||||
encryptionKeyId = RandomString.make(),
|
||||
encryptionKey = RandomString.make(),
|
||||
)
|
||||
|
||||
val event1 = anytype.Event(
|
||||
messages = listOf(
|
||||
anytype.Event.Message(pushEncryptionKeyUpdate = null),
|
||||
anytype.Event.Message(
|
||||
pushEncryptionKeyUpdate = anytype.Event.PushEncryptionKey.Update(
|
||||
encryptionKeyId = expectedUpdate1.encryptionKeyId,
|
||||
encryptionKey = expectedUpdate1.encryptionKey
|
||||
)
|
||||
),
|
||||
anytype.Event.Message(pushEncryptionKeyUpdate = null),
|
||||
)
|
||||
)
|
||||
|
||||
// When
|
||||
channel.start()
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
// Then
|
||||
channel.observe().test {
|
||||
|
||||
val emittedValue = awaitItem()
|
||||
assertEquals(PushKeyUpdate.EMPTY, emittedValue)
|
||||
|
||||
eventHandlerChannel.emit(event1)
|
||||
dispatcher.scheduler.advanceUntilIdle()
|
||||
|
||||
val update1 = awaitItem()
|
||||
assertEquals(expectedUpdate1, update1)
|
||||
|
||||
ensureAllEventsConsumed()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue