diff --git a/app/src/main/java/com/anytypeio/anytype/di/feature/chats/ChatsDI.kt b/app/src/main/java/com/anytypeio/anytype/di/feature/chats/ChatsDI.kt index 8ea4385221..af96907586 100644 --- a/app/src/main/java/com/anytypeio/anytype/di/feature/chats/ChatsDI.kt +++ b/app/src/main/java/com/anytypeio/anytype/di/feature/chats/ChatsDI.kt @@ -12,6 +12,7 @@ import com.anytypeio.anytype.domain.block.repo.BlockRepository import com.anytypeio.anytype.domain.chats.ChatEventChannel import com.anytypeio.anytype.domain.config.UserSettingsRepository import com.anytypeio.anytype.domain.debugging.Logger +import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer import com.anytypeio.anytype.domain.misc.UrlBuilder import com.anytypeio.anytype.domain.multiplayer.ActiveSpaceMemberSubscriptionContainer import com.anytypeio.anytype.domain.multiplayer.SpaceViewSubscriptionContainer @@ -101,4 +102,5 @@ interface ChatComponentDependencies : ComponentDependencies { fun context(): Context fun spaceManager(): SpaceManager fun notificationPermissionManager(): NotificationPermissionManager + fun storelessSubscriptionContainer(): StorelessSubscriptionContainer } \ No newline at end of file diff --git a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatContainer.kt b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatContainer.kt index 2f7b5c1089..c308a42e68 100644 --- a/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatContainer.kt +++ b/domain/src/main/java/com/anytypeio/anytype/domain/chats/ChatContainer.kt @@ -11,15 +11,21 @@ import com.anytypeio.anytype.core_models.chats.Chat import com.anytypeio.anytype.core_models.primitives.Space import com.anytypeio.anytype.domain.block.repo.BlockRepository import com.anytypeio.anytype.domain.debugging.Logger +import com.anytypeio.anytype.domain.library.StoreSearchByIdsParams +import com.anytypeio.anytype.domain.library.StoreSearchParams +import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer import javax.inject.Inject import kotlin.collections.isNotEmpty import kotlin.collections.toList +import kotlin.math.log +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge @@ -29,7 +35,8 @@ import kotlinx.coroutines.flow.scan class ChatContainer @Inject constructor( private val repo: BlockRepository, private val channel: ChatEventChannel, - private val logger: Logger + private val logger: Logger, + private val subscription: StorelessSubscriptionContainer ) { private val lastMessages = LinkedHashMap() @@ -40,36 +47,28 @@ class ChatContainer @Inject constructor( private val attachments = MutableStateFlow>(emptySet()) private val replies = MutableStateFlow>(emptySet()) - // TODO Naive implementation. Add caching logic - fun fetchAttachments(space: Space) : Flow> { + @OptIn(ExperimentalCoroutinesApi::class) + fun subscribeToAttachments(chat: Id, space: Space) : Flow> { return attachments - .map { ids -> - if (ids.isNotEmpty()) { - repo.searchObjects( - sorts = emptyList(), - limit = 0, - filters = buildList { - DVFilter( - relation = Relations.ID, - value = ids.toList(), - condition = DVFilterCondition.IN - ) - }, - keys = emptyList(), - space = space - ).mapNotNull { - val wrapper = ObjectWrapper.Basic(it) - if (wrapper.isValid) wrapper else null - } - } else { - emptyList() + .flatMapLatest { ids -> + subscription.subscribe( + searchParams = StoreSearchByIdsParams( + subscription = "$chat/$ATTACHMENT_SUBSCRIPTION_POSTFIX", + space = space, + targets = ids.toList(), + keys = ATTACHMENT_KEYS + ) + ).map { wrappers -> + wrappers.associateBy { it.id } + } + } + .catch { e -> + emit(emptyMap()).also { + logger.logException(e, "DROID-2966 Error in the chat attachments pub/sub flow") } } - .distinctUntilChanged() - .map { wrappers -> wrappers.associateBy { it.id } } } - // TODO Naive implementation. Add caching logic fun fetchReplies(chat: Id) : Flow> { return replies .map { ids -> @@ -85,7 +84,12 @@ class ChatContainer @Inject constructor( } } .distinctUntilChanged() - .map { messages -> messages.associate { it.id to it } } + .map { messages -> messages.associateBy { it.id } } + .catch { e -> + emit(emptyMap()).also { + logger.logException(e, "DROID-2966 Error while fetching chat replies") + } + } } fun watchWhileTrackingAttachments(chat: Id): Flow { @@ -108,6 +112,9 @@ class ChatContainer @Inject constructor( suspend fun stop(chat: Id) { runCatching { repo.unsubscribeChat(chat) + repo.cancelObjectSearchSubscription( + listOf("$chat/$ATTACHMENT_SUBSCRIPTION_POSTFIX") + ) }.onFailure { logger.logWarning("DROID-2966 Error while unsubscribing from chat") }.onSuccess { @@ -674,6 +681,27 @@ class ChatContainer @Inject constructor( // TODO reduce message size to reduce UI and VM overload. private const val MAX_CHAT_CACHE_SIZE = 1000 private const val LAST_MESSAGES_MAX_SIZE = 10 + private const val ATTACHMENT_SUBSCRIPTION_POSTFIX = "attachments" + + + private val ATTACHMENT_KEYS = listOf( + Relations.ID, + Relations.SPACE_ID, + Relations.NAME, + Relations.ICON_IMAGE, + Relations.ICON_EMOJI, + Relations.ICON_NAME, + Relations.ICON_OPTION, + Relations.TYPE, + Relations.LAYOUT, + Relations.IS_ARCHIVED, + Relations.IS_DELETED, + Relations.DONE, + Relations.SNIPPET, + Relations.SIZE_IN_BYTES, + Relations.FILE_MIME_TYPE, + Relations.FILE_EXT, + ) } data class ChatMessageMeta(val id: Id, val order: String) diff --git a/domain/src/test/java/com/anytypeio/anytype/domain/chats/ChatContainerTest.kt b/domain/src/test/java/com/anytypeio/anytype/domain/chats/ChatContainerTest.kt index a726ee4365..3a62181392 100644 --- a/domain/src/test/java/com/anytypeio/anytype/domain/chats/ChatContainerTest.kt +++ b/domain/src/test/java/com/anytypeio/anytype/domain/chats/ChatContainerTest.kt @@ -11,6 +11,7 @@ import com.anytypeio.anytype.domain.base.AppCoroutineDispatchers import com.anytypeio.anytype.domain.block.repo.BlockRepository import com.anytypeio.anytype.domain.common.DefaultCoroutineTestRule import com.anytypeio.anytype.domain.debugging.Logger +import com.anytypeio.anytype.domain.library.StorelessSubscriptionContainer import com.anytypeio.anytype.test_utils.MockDataFactory import kotlin.test.assertEquals import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -47,6 +48,9 @@ class ChatContainerTest { @Mock lateinit var logger: Logger + @Mock + lateinit var storelessSubscriptionContainer: StorelessSubscriptionContainer + private val givenChatID = MockDataFactory.randomUuid() @Before @@ -61,7 +65,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val msg = StubChatMessage( @@ -125,7 +130,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val initialMsg = StubChatMessage( @@ -187,7 +193,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val initialMsg = StubChatMessage( @@ -258,7 +265,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val initialMsg = StubChatMessage( @@ -334,7 +342,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val firstMessage = StubChatMessage(order = "B") @@ -398,7 +407,8 @@ class ChatContainerTest { val container = ChatContainer( repo = repo, channel = channel, - logger = logger + logger = logger, + subscription = storelessSubscriptionContainer ) val messages = buildList { diff --git a/feature-chats/src/main/java/com/anytypeio/anytype/feature_chats/presentation/ChatViewModel.kt b/feature-chats/src/main/java/com/anytypeio/anytype/feature_chats/presentation/ChatViewModel.kt index 6b21e4e66c..c34fac0f14 100644 --- a/feature-chats/src/main/java/com/anytypeio/anytype/feature_chats/presentation/ChatViewModel.kt +++ b/feature-chats/src/main/java/com/anytypeio/anytype/feature_chats/presentation/ChatViewModel.kt @@ -173,10 +173,8 @@ class ChatViewModel @Inject constructor( chat: Id ) { combine( - chatContainer - .watchWhileTrackingAttachments(chat = chat).distinctUntilChanged() - , - chatContainer.fetchAttachments(vmParams.space).distinctUntilChanged(), + chatContainer.watchWhileTrackingAttachments(chat = chat).distinctUntilChanged(), + chatContainer.subscribeToAttachments(vmParams.ctx, vmParams.space).distinctUntilChanged(), chatContainer.fetchReplies(chat = chat).distinctUntilChanged() ) { result, dependencies, replies -> Timber.d("DROID-2966 Chat counter state from container: ${result.state}")