import { Injectable } from '@angular/core';
import type { Observable } from 'rxjs';
import { BehaviorSubject, Subject, forkJoin } from 'rxjs';
import { filter, finalize, map, share, switchMap, tap } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
import { ChatChannelAttributes } from 'app/shared/messaging-chat/model/chat-channel-attributes.model';
import { ThreadAttributeUpdatedEvent } from 'app/shared/messaging-chat/model/thread-attribute-updated-event.model';
import type { Client, Message } from '@twilio/conversations';
import type {
    ChatChannelEvent,
    ChatChannelUpdatedEvent,
} from '../shared/messaging-chat/model';
import {
    ChatChannelEventType,
    ChatChannelMember,
    ChatChannelUpdateReason,
    ChatMessage,
    ThreadMessageConsumptionStatus,
} from '../shared/messaging-chat/model';
import { BaseMessagingChatClient } from '../shared/messaging-chat/base-messaging-chat-client.service';
import type { ChatMessagingThreadsInfo } from '../shared/messaging-chat/messaging-inbox/model/chat-messaging-threads-info.model';
import { ChatMessagingThread } from '../shared/messaging-chat/messaging-inbox/model/chat-messaging-thread.model';
import { isDefined } from '../shared/utils';
import { ChatMessageAttributes } from '../shared/messaging-chat/model/chat-message-attributes.model';
import { fetchConversationDescriptors } from '../shared/messaging-chat/chat-utils';
import { BroadcastService } from './broadcast.service';

@Injectable({
    providedIn: 'root',
})
export class MessagingChatListenerService {
    perChannelMessageConsumptionStatus$ = new BehaviorSubject<
        Map<string, ThreadMessageConsumptionStatus>
    >(null);
    threadInfoUpdate$ = new BehaviorSubject<ThreadAttributeUpdatedEvent>(null);

    private _perChannelMessageConsumptionStatus = new Map<
        string,
        ThreadMessageConsumptionStatus
    >();
    private channelParticipants = new Map<string, Set<number>>();

    private _threadsInfo: ChatMessagingThreadsInfo;
    private readonly members: Map<number, ChatChannelMember> = new Map();

    private threadsInfo$: Observable<[Client, ChatMessagingThreadsInfo]>;
    private isFetchingThreadsInfo: boolean;

    channelEvent$ = new Subject<ChatChannelEvent>();

    private isListening: boolean;

    get perChannelMessageConsumptionStatus() {
        return this._perChannelMessageConsumptionStatus;
    }

    constructor(
        private chatClient: BaseMessagingChatClient,
        private broadcastService: BroadcastService,
        private http: HttpClient,
    ) {}

    startListening(): void {
        if (this.isListening) {
            return;
        }

        this.isListening = true;
        this.isFetchingThreadsInfo = true;
        this.threadsInfo$ = forkJoin([
            this.chatClient.getClientAsync(),
            this.fetchMessagingThreads(),
        ])
            // share()'ing this observable to avoid firing separate http request to get the messaging threads
            // if threads are still being fetched by this listener service
            .pipe(share());

        this.threadsInfo$
            .pipe(finalize(() => (this.isFetchingThreadsInfo = false)))
            .subscribe(
                (result) => {
                    const [client, threads] = result;

                    this.computeInitialUnconsumedMessagesStateAsync();
                    this.setChatClientListeners(client);
                },
                (error) => {
                    this.isListening = false;
                },
            );
    }

    stopListening() {
        if (!this.isListening) {
            return;
        }
        this.isListening = false;
        this.isFetchingThreadsInfo = false;
        this.chatClient.removeAllClientListeners();
        this.chatClient.shutdown();
        this._perChannelMessageConsumptionStatus.clear();
        this.channelParticipants.clear();
        this.members.clear();
    }

    addParticipant(chatChannelId: string, userId: number) {
        const threadParticipants =
            this.channelParticipants.get(chatChannelId) || new Set<number>();
        threadParticipants.add(userId);
        this.channelParticipants.set(chatChannelId, threadParticipants);
    }

    getMessagingThreadsAsync(): Observable<ChatMessagingThreadsInfo> {
        if (this.isFetchingThreadsInfo) {
            return this.threadsInfo$.pipe(map((result) => result[1]));
        }

        return this.fetchMessagingThreads();
    }

    updatePerChannelConsumptionStatus(
        channelSid: string,
        lastIndex: number,
        lastConsumedIndex: number,
        lastMessageInfo?: Message,
    ): void {
        let channelConsumptionStatus =
            this._perChannelMessageConsumptionStatus.get(channelSid);
        if (!channelConsumptionStatus) {
            channelConsumptionStatus = new ThreadMessageConsumptionStatus(
                lastIndex,
                lastConsumedIndex,
            );
            this._perChannelMessageConsumptionStatus.set(
                channelSid,
                channelConsumptionStatus,
            );
        }

        if (lastMessageInfo) {
            channelConsumptionStatus.setLastMessageInformation(
                lastMessageInfo.author,
                lastMessageInfo.dateCreated,
                lastMessageInfo.body,
                lastMessageInfo.media,
            );
            if (
                !channelConsumptionStatus.lastMessageBody &&
                lastMessageInfo.type === 'media'
            ) {
                channelConsumptionStatus.lastMessageBody =
                    ChatMessageAttributes.of(
                        lastMessageInfo.attributes,
                    ).attachmentMessage;
            }
        }

        channelConsumptionStatus.lastMessageIndex = lastIndex;
        channelConsumptionStatus.lastConsumedMessageIndex = lastConsumedIndex;
    }

    fetchMessagingThreads(params?: any): Observable<ChatMessagingThreadsInfo> {
        const observable$ = this.http.get<ChatMessagingThreadsInfo>(
            `/api/chat/message-threads`,
            { params },
        );

        return this.mapChatMessagingThreadInfo(observable$).pipe(
            tap((threadsInfo) => {
                this.mergeThreadInfoData(threadsInfo);
            }),
        );
    }

    private setChatClientListeners(client: Client) {
        // watch new messages being sent
        client.addListener(
            ChatChannelEventType.messageAdded,
            (message: Message) => {
                const chatMessage = ChatMessage.fromNativeMessage(
                    message,
                    this.chatClient.userIdentity,
                    this.members,
                );
                const channelSid = message.conversation.sid;
                if (chatMessage.author) {
                    this.emitMessageAddedEvent(chatMessage, channelSid);

                    // updates current user channel information to update indicators and last message info
                    this.updatePerChannelConsumptionStatus(
                        channelSid,
                        message.index || 0,
                        null,
                        message,
                    );

                    if (!this.equalsCurrentUserIdentity(message.author)) {
                        this.computeAndEmitUnconsumedMessagesEvent();
                        this.emitPerChannelUnconsumedMessagesStatus();
                    }
                } else {
                    // Author attribute is missing on a Message which means that a message has
                    // been sent by the member who's just (re-)obtained channel membership.
                    // In this case we need to resolve (fetch) this new member's info (e.g.
                    // last / first name etc.)
                    const thread = this.findMessageThreadChannelId(channelSid);
                    if (!thread) {
                        return;
                    }

                    this.chatClient
                        .fetchChannelMembers(
                            [chatMessage.spUserId],
                            thread.matchId,
                        )
                        .pipe(filter((members) => members.length > 0))
                        .subscribe((members) => {
                            const [member] = members;
                            member.identity = chatMessage.authorIdentity;
                            chatMessage.author = member;
                            this.members.set(member.userId, member);
                            this.emitMessageAddedEvent(
                                chatMessage,
                                channelSid,
                                true,
                            );

                            // updates current user channel information to update indicators and last message info
                            this.updatePerChannelConsumptionStatus(
                                channelSid,
                                message.index,
                                null,
                                message,
                            );

                            if (
                                !this.equalsCurrentUserIdentity(message.author)
                            ) {
                                this.computeAndEmitUnconsumedMessagesEvent();
                                this.emitPerChannelUnconsumedMessagesStatus();
                            }
                        });
                }
            },
        );

        // watch consumption horizon change
        client.addListener(
            ChatChannelEventType.conversationUpdated,
            (event: ChatChannelUpdatedEvent) => {
                if (
                    event.updateReasons.includes(
                        ChatChannelUpdateReason.lastReadMessageIndex,
                    )
                ) {
                    this.updatePerChannelConsumptionStatus(
                        event.conversation.sid,
                        event.conversation.lastMessage.index,
                        event.conversation.lastReadMessageIndex,
                    );
                    this.computeAndEmitUnconsumedMessagesEvent();
                    this.emitPerChannelUnconsumedMessagesStatus();
                } else if (
                    event.updateReasons.includes(
                        ChatChannelUpdateReason.attributes,
                    )
                ) {
                    const threadEvent = new ThreadAttributeUpdatedEvent(
                        event.conversation.sid,
                        ChatChannelAttributes.of(event.conversation.attributes),
                    );

                    this.threadInfoUpdate$.next(threadEvent);
                }
            },
        );
    }

    private computeInitialUnconsumedMessagesStateAsync(): void {
        const subscribedConversationsPage$ = this.chatClient
            .getClientAsync()
            .pipe(switchMap((client) => client.getSubscribedConversations()));

        fetchConversationDescriptors(subscribedConversationsPage$).subscribe(
            (channelDescriptors) => {
                channelDescriptors.map((descriptor) => {
                    let lastMessageIndex;
                    if (descriptor.lastMessage) {
                        lastMessageIndex =
                            descriptor.lastMessage.index > 0
                                ? descriptor.lastMessage.index
                                : 0;
                    }
                    this._perChannelMessageConsumptionStatus.set(
                        descriptor.sid,
                        new ThreadMessageConsumptionStatus(
                            lastMessageIndex,
                            descriptor.lastReadMessageIndex,
                        ),
                    );
                });
                this.emitPerChannelUnconsumedMessagesStatus();
                this.computeAndEmitUnconsumedMessagesEvent();
            },
        );
    }

    private computeAndEmitUnconsumedMessagesEvent(): void {
        const hasUnconsumed =
            this.computeUnconsumedMessageStateThroughAllChannels();
        this.emitUnconsumedMessagesStatus(hasUnconsumed);
    }

    private computeUnconsumedMessageStateThroughAllChannels(): boolean {
        let hasUnconsumed = false;
        for (const entries of this._perChannelMessageConsumptionStatus.entries()) {
            const [channelSid, threadMessagesConsumptionStatus] = entries;
            const channelParticipants =
                this.channelParticipants.get(channelSid);

            const parsedUserIdentity =
                ChatChannelMember.getSpUserIdFromIdentity(
                    this.chatClient.userIdentity,
                );

            if (
                threadMessagesConsumptionStatus.hasUnconsumed() &&
                channelParticipants &&
                channelParticipants.has(parsedUserIdentity)
            ) {
                hasUnconsumed = true;

                break;
            }
        }

        return hasUnconsumed;
    }

    private emitUnconsumedMessagesStatus(hasUnconsumed: boolean) {
        this.broadcastService.broadcastChatMessageAddedEvent(hasUnconsumed);
    }

    private emitPerChannelUnconsumedMessagesStatus(): void {
        // we have to emit a clone of a perChannelMessageConsumptionStatus map,
        // otherwise BehaviorSubject emitting this value is going to keep
        // a reference to a Map and all updates to Map w/o emitting value
        // will be visible to new subscribers
        this.perChannelMessageConsumptionStatus$.next(
            new Map<string, ThreadMessageConsumptionStatus>(
                this._perChannelMessageConsumptionStatus,
            ),
        );
    }

    private emitMessageAddedEvent(
        message: ChatMessage,
        channelSid: string,
        hasNewMember = false,
    ) {
        this.channelEvent$.next({
            type: ChatChannelEventType.messageAdded,
            message,
            channelSid,
            hasNewMember,
        });
    }

    private initPerThreadParticipants(threadsInfo: ChatMessagingThreadsInfo) {
        threadsInfo.threads.forEach((thread) => {
            this.channelParticipants.set(
                thread.chatChannelId,
                new Set(thread.participantIds),
            );
        });
    }

    private mapChatMessagingThreadInfo(
        observable$: Observable<ChatMessagingThreadsInfo>,
    ) {
        return observable$.pipe(
            // convert into entities
            map((threadsInfo) => {
                threadsInfo.members = threadsInfo.members.map(
                    (member) => new ChatChannelMember(member),
                );
                threadsInfo.threads = threadsInfo.threads.map(
                    (thread) => new ChatMessagingThread(thread),
                );

                return threadsInfo;
            }),
        );
    }

    private findMessageThreadChannelId(
        channelSid: string,
    ): ChatMessagingThread {
        return this._threadsInfo.threads.find(
            (thread) => thread.chatChannelId === channelSid,
        );
    }

    private mergeThreadInfoData(threadsInfo: ChatMessagingThreadsInfo) {
        if (!isDefined(this._threadsInfo)) {
            this._threadsInfo = threadsInfo;
            this._threadsInfo.members.forEach((member) =>
                this.members.set(member.userId, new ChatChannelMember(member)),
            );
        } else {
            threadsInfo.threads.forEach((thread) => {
                const localThreadIndex = this._threadsInfo.threads.findIndex(
                    (localThread) => localThread.id === thread.id,
                );
                if (localThreadIndex > -1) {
                    this._threadsInfo.threads[localThreadIndex] = thread;
                } else {
                    this._threadsInfo.threads.push(thread);
                    this.updatePerChannelConsumptionStatus(
                        thread.chatChannelId,
                        0,
                        0,
                    );
                }
            });
            threadsInfo.members.forEach((member) => {
                const localMemberIndex = this._threadsInfo.members.findIndex(
                    (localMember) => localMember.userId === member.userId,
                );
                if (localMemberIndex < 0) {
                    this._threadsInfo.members.push(member);
                }
                this.members.set(member.userId, new ChatChannelMember(member));
            });
        }
        this.initPerThreadParticipants(this._threadsInfo);
    }

    private equalsCurrentUserIdentity(userIdentity: string): boolean {
        return this.chatClient.equalsCurrentUser(userIdentity);
    }
}
