1
0
Fork 0
mirror of https://github.com/anyproto/anytype-kotlin.git synced 2025-06-08 05:47:05 +09:00

DROID-2966 Chats | Enhancement | Add pub/sub for attachments (#2477)

This commit is contained in:
Evgenii Kozlov 2025-05-29 10:50:20 +02:00 committed by GitHub
parent bf12edd89f
commit 71ab93fc96
Signed by: github
GPG key ID: B5690EEEBB952194
4 changed files with 75 additions and 37 deletions

View file

@ -12,6 +12,7 @@ import com.anytypeio.anytype.domain.block.repo.BlockRepository
import com.anytypeio.anytype.domain.chats.ChatEventChannel
import com.anytypeio.anytype.domain.config.UserSettingsRepository
import com.anytypeio.anytype.domain.debugging.Logger
import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer
import com.anytypeio.anytype.domain.misc.UrlBuilder
import com.anytypeio.anytype.domain.multiplayer.ActiveSpaceMemberSubscriptionContainer
import com.anytypeio.anytype.domain.multiplayer.SpaceViewSubscriptionContainer
@ -101,4 +102,5 @@ interface ChatComponentDependencies : ComponentDependencies {
fun context(): Context
fun spaceManager(): SpaceManager
fun notificationPermissionManager(): NotificationPermissionManager
fun storelessSubscriptionContainer(): StorelessSubscriptionContainer
}

View file

@ -11,15 +11,21 @@ import com.anytypeio.anytype.core_models.chats.Chat
import com.anytypeio.anytype.core_models.primitives.Space
import com.anytypeio.anytype.domain.block.repo.BlockRepository
import com.anytypeio.anytype.domain.debugging.Logger
import com.anytypeio.anytype.domain.library.StoreSearchByIdsParams
import com.anytypeio.anytype.domain.library.StoreSearchParams
import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer
import javax.inject.Inject
import kotlin.collections.isNotEmpty
import kotlin.collections.toList
import kotlin.math.log
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
@ -29,7 +35,8 @@ import kotlinx.coroutines.flow.scan
class ChatContainer @Inject constructor(
private val repo: BlockRepository,
private val channel: ChatEventChannel,
private val logger: Logger
private val logger: Logger,
private val subscription: StorelessSubscriptionContainer
) {
private val lastMessages = LinkedHashMap<Id, ChatMessageMeta>()
@ -40,36 +47,28 @@ class ChatContainer @Inject constructor(
private val attachments = MutableStateFlow<Set<Id>>(emptySet())
private val replies = MutableStateFlow<Set<Id>>(emptySet())
// TODO Naive implementation. Add caching logic
fun fetchAttachments(space: Space) : Flow<Map<Id, ObjectWrapper.Basic>> {
@OptIn(ExperimentalCoroutinesApi::class)
fun subscribeToAttachments(chat: Id, space: Space) : Flow<Map<Id, ObjectWrapper.Basic>> {
return attachments
.map { ids ->
if (ids.isNotEmpty()) {
repo.searchObjects(
sorts = emptyList(),
limit = 0,
filters = buildList {
DVFilter(
relation = Relations.ID,
value = ids.toList(),
condition = DVFilterCondition.IN
)
},
keys = emptyList(),
space = space
).mapNotNull {
val wrapper = ObjectWrapper.Basic(it)
if (wrapper.isValid) wrapper else null
}
} else {
emptyList()
.flatMapLatest { ids ->
subscription.subscribe(
searchParams = StoreSearchByIdsParams(
subscription = "$chat/$ATTACHMENT_SUBSCRIPTION_POSTFIX",
space = space,
targets = ids.toList(),
keys = ATTACHMENT_KEYS
)
).map { wrappers ->
wrappers.associateBy { it.id }
}
}
.catch { e ->
emit(emptyMap()).also {
logger.logException(e, "DROID-2966 Error in the chat attachments pub/sub flow")
}
}
.distinctUntilChanged()
.map { wrappers -> wrappers.associateBy { it.id } }
}
// TODO Naive implementation. Add caching logic
fun fetchReplies(chat: Id) : Flow<Map<Id, Chat.Message>> {
return replies
.map { ids ->
@ -85,7 +84,12 @@ class ChatContainer @Inject constructor(
}
}
.distinctUntilChanged()
.map { messages -> messages.associate { it.id to it } }
.map { messages -> messages.associateBy { it.id } }
.catch { e ->
emit(emptyMap()).also {
logger.logException(e, "DROID-2966 Error while fetching chat replies")
}
}
}
fun watchWhileTrackingAttachments(chat: Id): Flow<ChatStreamState> {
@ -108,6 +112,9 @@ class ChatContainer @Inject constructor(
suspend fun stop(chat: Id) {
runCatching {
repo.unsubscribeChat(chat)
repo.cancelObjectSearchSubscription(
listOf("$chat/$ATTACHMENT_SUBSCRIPTION_POSTFIX")
)
}.onFailure {
logger.logWarning("DROID-2966 Error while unsubscribing from chat")
}.onSuccess {
@ -674,6 +681,27 @@ class ChatContainer @Inject constructor(
// TODO reduce message size to reduce UI and VM overload.
private const val MAX_CHAT_CACHE_SIZE = 1000
private const val LAST_MESSAGES_MAX_SIZE = 10
private const val ATTACHMENT_SUBSCRIPTION_POSTFIX = "attachments"
private val ATTACHMENT_KEYS = listOf(
Relations.ID,
Relations.SPACE_ID,
Relations.NAME,
Relations.ICON_IMAGE,
Relations.ICON_EMOJI,
Relations.ICON_NAME,
Relations.ICON_OPTION,
Relations.TYPE,
Relations.LAYOUT,
Relations.IS_ARCHIVED,
Relations.IS_DELETED,
Relations.DONE,
Relations.SNIPPET,
Relations.SIZE_IN_BYTES,
Relations.FILE_MIME_TYPE,
Relations.FILE_EXT,
)
}
data class ChatMessageMeta(val id: Id, val order: String)

View file

@ -11,6 +11,7 @@ import com.anytypeio.anytype.domain.base.AppCoroutineDispatchers
import com.anytypeio.anytype.domain.block.repo.BlockRepository
import com.anytypeio.anytype.domain.common.DefaultCoroutineTestRule
import com.anytypeio.anytype.domain.debugging.Logger
import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer
import com.anytypeio.anytype.test_utils.MockDataFactory
import kotlin.test.assertEquals
import kotlinx.coroutines.ExperimentalCoroutinesApi
@ -47,6 +48,9 @@ class ChatContainerTest {
@Mock
lateinit var logger: Logger
@Mock
lateinit var storelessSubscriptionContainer: StorelessSubscriptionContainer
private val givenChatID = MockDataFactory.randomUuid()
@Before
@ -61,7 +65,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val msg = StubChatMessage(
@ -125,7 +130,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val initialMsg = StubChatMessage(
@ -187,7 +193,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val initialMsg = StubChatMessage(
@ -258,7 +265,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val initialMsg = StubChatMessage(
@ -334,7 +342,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val firstMessage = StubChatMessage(order = "B")
@ -398,7 +407,8 @@ class ChatContainerTest {
val container = ChatContainer(
repo = repo,
channel = channel,
logger = logger
logger = logger,
subscription = storelessSubscriptionContainer
)
val messages = buildList {

View file

@ -173,10 +173,8 @@ class ChatViewModel @Inject constructor(
chat: Id
) {
combine(
chatContainer
.watchWhileTrackingAttachments(chat = chat).distinctUntilChanged()
,
chatContainer.fetchAttachments(vmParams.space).distinctUntilChanged(),
chatContainer.watchWhileTrackingAttachments(chat = chat).distinctUntilChanged(),
chatContainer.subscribeToAttachments(vmParams.ctx, vmParams.space).distinctUntilChanged(),
chatContainer.fetchReplies(chat = chat).distinctUntilChanged()
) { result, dependencies, replies ->
Timber.d("DROID-2966 Chat counter state from container: ${result.state}")