/* eslint-disable max-classes-per-file */
import { isEmpty, isNumber } from 'lodash-es';
import {
    BehaviorSubject,
    Observable,
    Subject,
    defer,
    from,
    of,
    throwError
} from 'rxjs';
import {
    catchError,
    distinctUntilChanged,
    map,
    switchMap,
    tap
} from 'rxjs/operators';

import { Inject, Injectable, NgZone, Optional } from '@angular/core';
import { StreamingService as MonkeywayStreamingService } from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/streaming/streaming.js';
import { Specs } from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/base.js';
import {
    ConnectionCutError,
    ConnectionCutType,
    QuotaCheckError,
    SessionNotAvailableError,
    SessionNotFoundError
} from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/errors.js';
import { TwilioTokenData } from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/session-data.js';
import { SessionOptions } from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/session-options.js';
import { StreamInfo } from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/stream-info.js';
import type {
    CutStreamLimits,
    StreamLimits
} from '@mhp-immersive-exp/sdk/streaming/monkeyway/internal/types/stream-options.js';
import { CustomError, IllegalStateError, runInZone } from '@mhp/common';

import { ApplicationStateService } from '../application-state';
import { ErrorHandlerService } from '../error-handler';
import { StreamStatus, setEngineHostUrl, setStreamStatus } from './state';
import {
    ExtendedCutStreamLimits,
    MONKEYWAY_ENVIRONMENT_CONFIG_TOKEN,
    MonkeywayEnvironmentConfig
} from './streaming.config';

export interface StreamSessionOptions {
    sessionCode?: string; // ID for a dedicated session to be joined
    useAudio?: boolean;
    streamLimits?: StreamLimits;
    cutStreamLimits?: ExtendedCutStreamLimits;

    /** Optional session-specs that should be attached to the session and might
     * be used for reporting functionality.
     * Note that these can only be used when starting an "ad-hoc" session. When
     * joining a session by providing a sessionCode, the sessionSpecs need to be
     * already set when doing the session-reservation.
     */
    sessionSpecs?: Specs;
}

export interface StreamStatusInfo {
    status: StreamStatus;
    streamInfo?: StreamInfo;
}

/**
 * Information about a streaming-session.
 */
export type StreamSessionInfo = Omit<StreamInfo, 'stream' | 'specs'>;

/**
 * Error thrown in case an error occurs while working with a stream.
 */
export class StreamError extends CustomError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

export interface StreamErrorInfo {
    error: StreamError;
    context: {
        willRetry: boolean;
        willRetryIn?: number;
    };
}

export class StreamConnectionError extends StreamError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

export class StreamConnectionCutError extends StreamError {
    constructor(
        message: string,
        public readonly connectionCutType:
            | 'UNKNOWN'
            | 'BITRATE'
            | 'LATENCY'
            | 'FRAMES',
        details?: any
    ) {
        super(message, details);
    }
}

export class StreamRendererUnavailableError extends StreamError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

/**
 * No stream available because the underlying quota has been exceeded.
 */
export class StreamRendererUnavailableQuotaExceededError extends StreamRendererUnavailableError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

/**
 * Thrown in case a session, identified via a session-code or -key, could
 * not be found.
 */
export class StreamSessionNotFoundError extends StreamError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

/**
 * Thrown in case a quota-check failed.
 */
export class StreamQuotaCheckError extends StreamError {
    constructor(message: string, details?: any) {
        super(message, details);
    }
}

/**
 * Service providing access to a stream-session.
 * Currently, one one session is supported at once
 */
@Injectable()
export class StreamingService {
    private readonly mwStreamingService: MonkeywayStreamingService;

    private readonly streamStatusSubject =
        new BehaviorSubject<StreamStatusInfo>({
            status: StreamStatus.DISCONNECTED
        });

    private readonly streamStatus$ = this.streamStatusSubject.asObservable();

    private readonly streamingErrorsSubject = new Subject<StreamErrorInfo>();

    private readonly streamingErrors$ =
        this.streamingErrorsSubject.asObservable();

    constructor(
        @Inject(MONKEYWAY_ENVIRONMENT_CONFIG_TOKEN)
        private readonly streamingServiceConfig: MonkeywayEnvironmentConfig,
        private readonly errorHandlerService: ErrorHandlerService,
        private readonly ngZone: NgZone,
        @Optional()
        private readonly applicationStateService?: ApplicationStateService
    ) {
        this.mwStreamingService = new MonkeywayStreamingService({
            baseUrl: streamingServiceConfig.baseUrl,
            appEnvId: streamingServiceConfig.appEnvId,
            apiKey: streamingServiceConfig.apiKey
        });
        if (applicationStateService) {
            this.initApplicationStateBindings(applicationStateService);
        }
    }

    getTwilioToken$(): Observable<TwilioTokenData | undefined> {
        return this.getStreamStatus$().pipe(
            map((streamStatus) => streamStatus.streamInfo?.session?.id),
            distinctUntilChanged(),
            switchMap((sessionId) => {
                if (!sessionId) {
                    return of(undefined);
                }
                return this.mwStreamingService.getTwilioToken(sessionId);
            })
        );
    }

    /**
     * Start connecting to a stream.
     * The connection stays active until the subscriber unsubscribes.
     * @param streamOptions
     */
    startStream$(streamOptions: StreamSessionOptions): Observable<StreamInfo> {
        if (
            this.streamStatusSubject.value.status !==
                StreamStatus.DISCONNECTED &&
            this.streamStatusSubject.value.status !== StreamStatus.DISCONNECTING
        ) {
            throw new StreamError('Session already started');
        }

        if (
            streamOptions?.sessionCode &&
            !isEmpty(streamOptions?.sessionSpecs)
        ) {
            throw new IllegalStateError(
                'sessionSpecs should only be provided when creating an ad-hoc session, not when joining an existing session using a sessionCode.'
            );
        }

        return new Observable<StreamInfo>((subscriber) => {
            this.streamStatusSubject.next({
                status: StreamStatus.CONNECTING
            });

            let streamLimits: StreamLimits | undefined;
            if (streamOptions.streamLimits) {
                streamLimits = {
                    ...streamOptions.streamLimits
                };
            } else if (this.streamingServiceConfig.streamLimits) {
                streamLimits = {
                    ...this.streamingServiceConfig.streamLimits
                };
            }

            const extendedCutStreamLimits: ExtendedCutStreamLimits | undefined =
                streamOptions.cutStreamLimits ??
                this.streamingServiceConfig.cutStreamLimits ??
                undefined;
            const extendedStreamingEventHandler =
                extendedCutStreamLimits?.handlerFactory?.buildExtendedStreamingEventHandler();

            const cutStreamLimits: CutStreamLimits | undefined =
                extendedCutStreamLimits
                    ? {
                          latency: extendedCutStreamLimits?.latency,
                          bitrate: extendedCutStreamLimits?.bitrate,
                          noFrames: isNumber(
                              extendedCutStreamLimits?.noFramesRatio
                          ),
                          handler: extendedStreamingEventHandler?.getHandler()
                      }
                    : undefined;

            defer(() => {
                if (!streamOptions.sessionCode) {
                    return this.mwStreamingService.start(
                        {
                            participants: 1,
                            sessionSpecs: {
                                ...streamOptions.sessionSpecs
                            }
                        },
                        {
                            useAudio: streamOptions.useAudio,
                            streamLimits,
                            cutStreamLimits
                        }
                    );
                }
                if (streamOptions.sessionCode.length === 5) {
                    return this.mwStreamingService.startWithConnectionCode(
                        Number.parseInt(streamOptions.sessionCode, 10),
                        {
                            useAudio: streamOptions.useAudio,
                            streamLimits,
                            cutStreamLimits
                        }
                    );
                }
                return this.mwStreamingService.startWithConnectionKey(
                    streamOptions.sessionCode,
                    {
                        useAudio: streamOptions.useAudio,
                        streamLimits,
                        cutStreamLimits
                    }
                );
            })
                .pipe(
                    runInZone(this.ngZone),
                    map((streamInfo: StreamInfo): StreamInfo => {
                        // this is more or less a hack, as a nullish stream-info might be emitted in an error-case..
                        if (!streamInfo) {
                            throw new StreamError(
                                'Connection to stream failed due to unknown reasons'
                            );
                        }
                        return streamInfo;
                    }),
                    this.errorHandlerService.applyRetry({
                        isEligibleForRetry: (error) =>
                            // FIXME: for appropriate error-handling, the streaming-lib needs to emit proper errors
                            true,
                        errorCallback: (error, retryContext) => {
                            // reset the streaming-event handler
                            extendedStreamingEventHandler?.reset();

                            // emit on error-stream
                            this.streamingErrorsSubject.next({
                                error:
                                    error instanceof StreamError
                                        ? error
                                        : this.mapMonkeywayError(error),
                                context: {
                                    willRetry:
                                        (retryContext.nextRetryAttempt || 0) >
                                        0,
                                    willRetryIn: retryContext.nextRetryAttempt
                                }
                            });

                            // update stream status
                            this.streamStatusSubject.next({
                                status: StreamStatus.CONNECTING,
                                streamInfo:
                                    this.streamStatusSubject.value.streamInfo
                            });
                        },
                        maxRetries: Number.POSITIVE_INFINITY
                    }),
                    tap((streamInfo) => {
                        this.streamStatusSubject.next({
                            status: StreamStatus.CONNECTED,
                            streamInfo
                        });
                    })
                )
                .subscribe(subscriber);

            return async () => {
                // cleanup
                try {
                    this.streamStatusSubject.next({
                        status: StreamStatus.DISCONNECTING
                    });
                    extendedStreamingEventHandler?.destroy();
                    await this.mwStreamingService.stop();
                } catch (error) {
                    const streamError = new StreamError(
                        'Failed stopping stream',
                        error
                    );
                    this.streamingErrorsSubject.next({
                        error: streamError,
                        context: {
                            willRetry: false
                        }
                    });
                } finally {
                    this.streamStatusSubject.next({
                        status: StreamStatus.DISCONNECTED
                    });
                }
            };
        });
    }

    /**
     * Get a stream emitting the current status of the stream.
     * This stream never finishes.
     */
    getStreamStatus$(): Observable<StreamStatusInfo> {
        return this.streamStatus$;
    }

    /**
     * Get a stream emitting errors that occur while streaming session is active.
     */
    getStreamErrors$(): Observable<StreamErrorInfo> {
        return this.streamingErrors$;
    }

    /**
     * Checks if there is enough quota available for starting a stream.
     * @param sessionOptions Options for starting a session.
     * @throws StreamQuotaCheckError In case the quota could not be checked.
     */
    checkQuota$(sessionOptions: SessionOptions): Observable<boolean> {
        return from(this.mwStreamingService.checkQuota(sessionOptions)).pipe(
            catchError((error) =>
                throwError(() => this.mapMonkeywayError(error))
            ),
            map((result) => !!result)
        );
    }

    /**
     * Extracts the URL where the remote engine can be reached in case
     * the given streamInfo is defined
     * @param streamInfo The StreamInfo to get the publicDomainName from
     */
    extractApplicationBaseUrl(streamInfo: StreamInfo) {
        if (!streamInfo.specs) {
            return undefined;
        }
        const { publicDomainName } = streamInfo.specs;

        return `https://${publicDomainName}`;
    }

    private initApplicationStateBindings(
        applicationStateService: ApplicationStateService<any>
    ) {
        // bind stream status
        this.getStreamStatus$().subscribe((streamStatusInfo) => {
            applicationStateService.dispatch(
                setStreamStatus({
                    status: streamStatusInfo.status
                })
            );
            if (streamStatusInfo.streamInfo) {
                applicationStateService.dispatch(
                    setEngineHostUrl({
                        engineHostUrl: this.extractApplicationBaseUrl(
                            streamStatusInfo.streamInfo
                        )
                    })
                );
            } else {
                applicationStateService.dispatch(
                    setEngineHostUrl({
                        engineHostUrl: undefined
                    })
                );
            }
        });
    }

    private mapMonkeywayError(error: any) {
        if (error instanceof SessionNotAvailableError) {
            if (error.errorCode === 'E_NO_QUOTA') {
                return new StreamRendererUnavailableQuotaExceededError(
                    'No renderer available due to quota exceeded',
                    error
                );
            }
            return new StreamRendererUnavailableError(
                'No renderer available',
                error
            );
        }
        if (error instanceof SessionNotFoundError) {
            return new StreamSessionNotFoundError(
                'Requested session could not be found',
                error
            );
        }
        if (error instanceof ConnectionCutError) {
            switch (error.cutType) {
                case ConnectionCutType.BITRATE:
                    return new StreamConnectionCutError(
                        error.message,
                        'BITRATE',
                        error
                    );
                case ConnectionCutType.LATENCY:
                    return new StreamConnectionCutError(
                        error.message,
                        'LATENCY',
                        error
                    );
                case ConnectionCutType.FRAMES:
                    return new StreamConnectionCutError(
                        error.message,
                        'FRAMES',
                        error
                    );
                default:
                    return new StreamConnectionCutError(
                        'Connection to stream was lost',
                        'UNKNOWN',
                        error
                    );
            }
        }
        if (error instanceof QuotaCheckError) {
            return new StreamQuotaCheckError(error.message, error);
        }
        if (error.message === 'RTC connection lost') {
            return new StreamConnectionError(error.message, error);
        }
        return new StreamError('Connection to stream failed', error);
    }
}
