import { isNil, isString } from 'lodash-es';
import {
    BehaviorSubject,
    EMPTY,
    Observable,
    ReplaySubject,
    Subject,
    combineLatest,
    concat,
    merge,
    of,
    pairwise
} from 'rxjs';
import {
    distinctUntilChanged,
    filter,
    finalize,
    last,
    map,
    shareReplay,
    startWith,
    switchMap,
    takeUntil
} from 'rxjs/operators';
import io, { Socket } from 'socket.io-client';
import { v4 as uuidv4 } from 'uuid';

import {
    Inject,
    Injectable,
    InjectionToken,
    NgZone,
    Optional,
    SkipSelf
} from '@angular/core';
import { WebsocketError } from '@mhp-immersive-exp/contracts/src/websocket/websocket-error';
import { WebsocketErrorCode } from '@mhp-immersive-exp/contracts/src/websocket/websocket-error-codes';
import { WebsocketMessage } from '@mhp-immersive-exp/contracts/src/websocket/websocket-message';
import { WebsocketPayload } from '@mhp-immersive-exp/contracts/src/websocket/websocket-payload';
import { IllegalStateError, lazyShareReplay, runInZone } from '@mhp/common';

import {
    WebsocketConnectionUnavailableError,
    WebsocketEndpointResponseError,
    WebsocketEndpointUnavailableError,
    WebsocketRequestError,
    WebsocketRequestTimeoutError
} from './errors/errors';
import { ServerCommunicationInterface } from './server-communication.interface';

/**
 * The configuration required to connect to a given sockJs-endpoint.
 */
export interface SocketIOServiceConfig {
    /**
     * URL to the sockJs-endpoint may be either provided statically (if it is known
     * beforehand and always stays the same) or as an observable stream emitting
     * the currently valid url. In the latter case, for every emitted value, a
     * possibly existing socket-instance is destroyed and a new one is created.
     */
    sockJsUrl: string | Observable<string | undefined>;

    /**
     * An optional roomId that should be joined once connected to the socket.
     * To change the room during runtime, use an Observable as roomId source.
     */
    roomId?: string | Observable<string | undefined>;
}

export const SOCKET_IO_SERVICE_CONFIG_TOKEN =
    new InjectionToken<SocketIOServiceConfig>('SocketIOService configuration');

export interface ConnectionInfo {
    url: string;
    roomId?: string;
    connected: boolean;
}

interface ConnectionInfoInternal extends ConnectionInfo {
    socket: Socket;
}

/**
 * Service to allow communication against a given endpoint, provided via
 * SOCKET_IO_SERVICE_CONFIG_TOKEN.
 * The service is provided in SocketIoModule instead of root to make it possible
 * to lazy-load socket-io related functionality only when it is required.
 */
@Injectable()
export class SocketIOService implements ServerCommunicationInterface {
    private socket?: Socket;

    private readonly connectionInfoSubject = new BehaviorSubject<
        ConnectionInfoInternal | undefined
    >(undefined);

    private readonly connectionInfo$: Observable<ConnectionInfo | undefined> =
        this.connectionInfoSubject.pipe(
            map((info) =>
                info
                    ? {
                          url: info.url,
                          roomId: info.roomId,
                          connected: info.connected
                      }
                    : undefined
            )
        );

    private readonly connectionState$ = this.connectionInfo$.pipe(
        map((info) => !!info?.connected)
    );

    constructor(
        private readonly ngZone: NgZone,
        @Inject(SOCKET_IO_SERVICE_CONFIG_TOKEN)
        socketIOServiceConfig: SocketIOServiceConfig,
        @Optional() @SkipSelf() parentService?: SocketIOService
    ) {
        if (parentService) {
            console.warn(
                'SocketIOService is already loaded. Import SocketIOModule only once where needed.'
            );
        }

        this.initSocketCreationLogic(socketIOServiceConfig);
    }

    /**
     * Issue an rpc-style request.
     * Note that the request is only sent when the returned Observable is being subscribed to.
     *
     * @param topic The topic to request.
     * @param payload The payload to forward along with the request.
     * @return Observable stream emitting the response or an error in case something went wrong.
     * @throws WebsocketConnectionUnavailableError In case the socket is not connected
     * @throws WebsocketEndpointUnavailableError In case the endpoint to be called is not available
     * @throws WebsocketRequestTimeoutError In case waiting for the response exceeded the hubs timeout setting
     * @throws WebsocketEndpointResponseError In case the endpoint could be called but returned an error
     */
    request<T, U>(topic: string, payload: T): Observable<U> {
        return new Observable<U>((subscriber: any) => {
            const subject = new ReplaySubject<{ data?: U | null }>(1);

            if (!this.socket?.connected) {
                subscriber.error(
                    new WebsocketConnectionUnavailableError(
                        `Failed issuing request [${topic}]: Socket is not connected.`
                    )
                );
                return undefined;
            }

            const { socket } = this;

            const traceId = uuidv4();

            try {
                // isNil checks for undefined or null. Therefore unknown type casting can be used, as all edge cases are handled!
                socket.emit(`req_${topic}`, {
                    data: isNil(payload) ? {} : payload,
                    traceId
                } as unknown);
            } catch (error: unknown) {
                subscriber.error(
                    new WebsocketRequestError(
                        `Failed issuing request [${topic}]`,
                        error as WebsocketError
                    )
                );
                return undefined;
            }

            const responseTopicName = `res_${topic}`;
            const socketListener = (data: unknown) => {
                // property is optional data?: ... T | null. Therefore types are T, undefined, null
                if (data && typeof data === 'object') {
                    if (
                        Object.prototype.hasOwnProperty.call(data, 'traceId') &&
                        (data as Record<string, unknown>).traceId !== traceId
                    ) {
                        return;
                    }

                    if (Object.prototype.hasOwnProperty.call(data, 'error')) {
                        const error = this.resolveWebsocketError(
                            (data as Record<string, unknown>)
                                .error as WebsocketError
                        );
                        subject.error(error);
                        return;
                    }
                    try {
                        subject.next(data);
                    } finally {
                        subject.complete();
                    }
                }
            };
            // listen for the response event. Unregistering from socket-event happens in Subject finalization
            socket.on(responseTopicName, socketListener);

            const connectionStateSubscription = this.connectionState$.subscribe(
                (connectionState) => {
                    if (!connectionState) {
                        subject.error(
                            new WebsocketConnectionUnavailableError(
                                `Failed getting response for request [${topic}]: Socket is not connected.`
                            )
                        );
                    }
                }
            );

            subject
                .pipe(
                    map((data) => data.data),
                    finalize(() => {
                        socket.off(responseTopicName, socketListener);
                        connectionStateSubscription.unsubscribe();
                    })
                )
                .subscribe(subscriber);

            return () => {
                if (!subject.closed) {
                    subject.complete();
                }
            };
        });
    }

    /**
     * Register an RPC endpoint.
     * Note that the endpoint is only registered when the returned Observable is being subscribed to and is unregistered when
     * unsubscribing from the returned Observable.
     *
     * When no socket-connection is available yet, it is waited until a socket-connection becomes available.
     * In case a socket-connection is changed (connection to a new endpoint), the registration is automatically renewed for
     * the new connection.
     *
     * @param endpoint The endpoint to register.
     * @param callback The callback being called when a request comes in. It is expected to throw EndpointResponseErrors only.
     * @return Observable stream emiting the state of the endpoint registration. Errors out in case anything unexpected is returned from the callback.
     * @throws IllegalStateError In case the callback throws an error other than EndpointResponseError.
     */
    registerEndpoint<
        T extends WebsocketPayload,
        U extends WebsocketPayload | void
    >(
        endpoint: string,
        callback: (requestPayload: T | null | undefined) => Promise<U>
    ): Observable<'REGISTERED' | 'UNREGISTERED' | 'PENDING'> {
        return this.connectionInfoSubject.pipe(
            map((connectionInfo) => connectionInfo?.socket),
            distinctUntilChanged(),
            switchMap(
                (
                    socket
                ): Observable<'REGISTERED' | 'UNREGISTERED' | 'PENDING'> => {
                    if (!socket) {
                        return of('PENDING');
                    }

                    return new Observable<
                        'REGISTERED' | 'UNREGISTERED' | 'PENDING'
                    >((subscriber) => {
                        if (!socket.connected) {
                            subscriber.next('PENDING');
                        }

                        const requestName = `req_${endpoint}`;
                        const responseName = `res_${endpoint}`;

                        const requestSubscription = this.subscribeToTopicRaw<
                            WebsocketMessage<T>
                        >(socket, requestName).subscribe(
                            async (requestMessage) => {
                                let response: U | null = null;
                                let caughtError: WebsocketEndpointResponseError | null =
                                    null;
                                try {
                                    response = await callback(
                                        requestMessage.data
                                    );
                                } catch (error: unknown) {
                                    if (
                                        !(
                                            error instanceof
                                            WebsocketEndpointResponseError
                                        )
                                    ) {
                                        subscriber.error(
                                            new IllegalStateError(
                                                `Endpoint callbacks are expected to throw EndpointResponseErrors only but got ${typeof error}`,
                                                error
                                            )
                                        );
                                    }
                                    caughtError =
                                        error as WebsocketEndpointResponseError;
                                }

                                socket.emit(responseName, <WebsocketMessage<U>>{
                                    traceId: requestMessage.traceId,
                                    data: response,
                                    error: caughtError?.details || null
                                });
                            }
                        );

                        const connectionStateSubsription =
                            this.getConnectionState$().subscribe(
                                (connected) => {
                                    if (connected) {
                                        // FIXME: this is not expected behavior from hub-side. It should not be required to unregister when socket was disconnected.
                                        socket.emit('unregister', endpoint);
                                        socket.emit('register', endpoint);
                                        subscriber.next('REGISTERED');
                                    } else {
                                        subscriber.next('UNREGISTERED');
                                    }
                                }
                            );

                        return () => {
                            requestSubscription.unsubscribe();
                            connectionStateSubsription.unsubscribe();
                            socket.emit('unregister', endpoint);
                        };
                    });
                }
            )
        );
    }

    /**
     * Emit an event.
     *
     * @param topic The topic to emit to.
     * @param payload The payload to emit.
     * @throws WebsocketConnectionUnavailableError In case the socket is not connected.
     */
    event<T>(topic: string, payload: T) {
        if (!this.socket?.connected) {
            throw new WebsocketConnectionUnavailableError(
                `Failed emitting to topic [${topic}]: Socket is not connected.`
            );
        }
        // FIXME: Handle case when emit didn't reach the server and allow the caller to react. Ack on server?
        this.socket.emit(`evt_${topic}`, { data: payload });
    }

    /**
     * Subscribe to the given topic.
     * When no socket-connection is available yet, it is waited until a socket-connection becomes available.
     * In case a socket-connection is changed (connection to a new endpoint), the subscription is renewed for
     * the new connection.
     * @param topic The topic to subscribe to.
     */
    subscribe<T>(topic: string): Observable<T> {
        return this.connectionInfoSubject.pipe(
            map((connectionInfo) => connectionInfo?.socket),
            distinctUntilChanged(), // only when the underlying socket changes,
            switchMap((socket) => {
                if (!socket) {
                    return EMPTY;
                }

                return new Observable<T>((subscriber) => {
                    const subject = new Subject<{ data: T }>();
                    const eventTopicName = `evt_${topic}`;
                    const socketListener = (data: { data: T }) => {
                        subject.next(data);
                    };

                    socket.on(eventTopicName, socketListener);
                    this.getConnectionState$()
                        .pipe(
                            filter((connected) => connected),
                            takeUntil(concat(of({}), subject).pipe(last()))
                        )
                        .subscribe(() => {
                            socket.emit('subscribe', topic);
                        });

                    subject
                        .pipe(map((data) => data.data))
                        .subscribe(subscriber);

                    return () => {
                        socket.off(eventTopicName, socketListener);
                        socket.emit('unsubscribe', topic);
                        subject.complete();
                    };
                });
            })
        );
    }

    /**
     * Subscribe to the given topic.
     * @param socket The socket to use for subscribing.
     * @param topic The topic to subscribe to.
     */
    private subscribeToTopicRaw<T>(
        socket: Socket,
        topic: string
    ): Observable<T> {
        return new Observable<T>((subscriber) => {
            const eventTopicName = topic;
            const socketListener = (data: T) => {
                subscriber.next(data);
            };

            socket.on(eventTopicName, socketListener);

            return () => {
                socket.off(eventTopicName, socketListener);
            };
        });
    }

    /**
     * Gets an observable stream emitting the ConnectionInfo of the socket-connection.
     */
    getConnectionInfo$(): Observable<ConnectionInfo | undefined> {
        return this.connectionInfo$;
    }

    /**
     * Gets an observable stream emitting the connection state of the socket-connection.
     */
    getConnectionState$(): Observable<boolean> {
        return this.connectionState$;
    }

    private resolveWebsocketError(error: WebsocketError) {
        if (error.code === WebsocketErrorCode.HUB_WEBSOCKET_NO_PUBLISHER) {
            return new WebsocketEndpointUnavailableError(error.message, error);
        }
        if (error.code === WebsocketErrorCode.HUB_WEBSOCKET_TIMEOUT) {
            return new WebsocketRequestTimeoutError(error.message, error);
        }
        return new WebsocketEndpointResponseError(error.message, error);
    }

    private initSocketCreationLogic(
        socketIOServiceConfig: SocketIOServiceConfig
    ) {
        let sockJsUrl$: Observable<string | undefined>;
        let roomId$: Observable<string | undefined> = of(undefined);

        if (isString(socketIOServiceConfig.sockJsUrl)) {
            sockJsUrl$ = of(socketIOServiceConfig.sockJsUrl);
        } else {
            sockJsUrl$ = socketIOServiceConfig.sockJsUrl;
        }

        if (socketIOServiceConfig.roomId) {
            if (isString(socketIOServiceConfig.roomId)) {
                roomId$ = of(socketIOServiceConfig.roomId);
            } else {
                roomId$ = socketIOServiceConfig.roomId.pipe(
                    distinctUntilChanged(),
                    shareReplay(1)
                );
            }
        }

        sockJsUrl$
            .pipe(
                // ensure we're entering ngZone here
                runInZone(this.ngZone),
                switchMap((sockJsUrl) => {
                    if (!sockJsUrl) {
                        return EMPTY;
                    }
                    return new Observable(() => {
                        if (!sockJsUrl) {
                            return undefined;
                        }
                        // init new socket and return the cleanup-functionm
                        return this.createSocket(sockJsUrl, roomId$);
                    });
                })
            )
            .subscribe();
    }

    private createSocket(
        sockJsUrl: string,
        roomId$: Observable<string | undefined>
    ): () => void {
        const socket = io(sockJsUrl, {
            transports: ['websocket']
        });

        socket.on('reconnect_attempt', () => {
            socket.io.opts.transports = ['polling', 'websocket'];
        });

        this.socket = socket;

        const prevAndCurrentRoomId$ = roomId$.pipe(
            startWith(undefined),
            pairwise(),
            lazyShareReplay()
        );

        // crate an observable emitting the current socket connection state
        const socketConnectionState$: Observable<
            'initialized' | 'connect' | 'disconnect'
        > = merge(
            of(<{ event: 'initialized' }>{ event: 'initialized' }),
            this.fromSocketEvent$(socket, 'connect'),
            this.fromSocketEvent$(socket, 'disconnect')
        ).pipe(map((event) => event.event));

        const roomHandlingSubscription = combineLatest([
            socketConnectionState$,
            prevAndCurrentRoomId$
        ]).subscribe(([connectionState, prevAndCurrentRoomId]) => {
            const [prevRoomId, currentRoomId] = prevAndCurrentRoomId;
            if (connectionState === 'connect') {
                // potentially leave the previous room
                if (prevRoomId) {
                    socket.emit('leave', prevRoomId);
                }
                // potentially join the current room
                if (currentRoomId) {
                    socket.emit('join', currentRoomId);
                }
            }
            // update socket state on every state change
            this.onSocketStateChange(sockJsUrl, currentRoomId);
        });

        return () => {
            // cleanup
            roomHandlingSubscription.unsubscribe();
            socket.disconnect();
            this.socket = undefined;

            this.onSocketStateChange();
        };
    }

    private onSocketStateChange(sockJsUrl?: string, roomId?: string) {
        if (!this.socket || !sockJsUrl) {
            this.connectionInfoSubject.next(undefined);
            return;
        }

        this.connectionInfoSubject.next({
            socket: this.socket,
            url: sockJsUrl,
            roomId,
            connected: !!this.socket?.connected
        });
    }

    private fromSocketEvent$<E extends string, D = unknown>(
        socket: Socket,
        event: E
    ): Observable<{
        event: E;
        data?: D;
    }> {
        return new Observable((subscriber) => {
            socket.on(<string>event, (data) => {
                subscriber.next({
                    event,
                    data
                });
            });

            return () => {
                socket.off(event);
            };
        });
    }
}
