import { Injectable } from '@angular/core';
import { HttpXsrfTokenExtractor } from '@angular/common/http';
import { Observable, Observer, Subscription, BehaviorSubject } from 'rxjs';
import { filter, share } from 'rxjs/operators';
import { Client, Message, StompSubscription } from '@stomp/stompjs';

import * as SockJS from 'sockjs-client';

import { StompState } from './stomp/stomp.models';
import { StompConfig } from './stomp/stomp.config';
import { StompHeaders } from './stomp/stomp-headers';


/******************************************************************************
 *
 * Based on:
 * https://github.com/stomp-js/ng2-stompjs
 * version 0.4.2
 *
 * last updated 16.12.2020
 *
 *****************************************************************************/


/**
 * Angular STOMP Service using @stomp/stomp.js
 *
 * @description This service handles subscribing to a message queue using the stomp.js library,
 * and returns values via the ES6 Observable specification for asynchronous value streaming by
 * wiring the STOMP messages into an observable.
 */

@Injectable({
	providedIn: 'root'
})
export class StompService {

	/**
	 * State of the STOMPService
	 *
	 * It is a BehaviorSubject and will emit current status immediately. This will typically get
	 * used to show current status to the end user.
	 */
	public state: BehaviorSubject<StompState>;

	/**
	 * Will trigger when connection is established. Use this to carry out initialization.
	 * It will trigger every time a (re)connection occurs. If it is already connected it
	 * will trigger immediately. You can safely ignore the value, as it will always be
	 * StompState.OPEN
	 */
	public connectObservable: Observable<StompState>;

	/**
	 * Internal array to hold locally queued messages when STOMP broker is not connected.
	 */
	protected queuedMessages: { queueName: string, message: string, headers: StompHeaders }[] = [];

	/**
	 * STOMP Client from @stomp/stomp.js
	 */
	protected client: any;

	/**
	 * Constructor
	 *
	 * See README and samples for configuration examples
	 */
	public constructor(public readonly config: StompConfig, private tokenExtractor: HttpXsrfTokenExtractor) {
		this.state = new BehaviorSubject<StompState>(StompState.CLOSED);

		this.connectObservable = this.state.pipe(filter((currentState: number) => currentState === StompState.OPEN));
	}

	/**
	 * Initialize STOMP Client
	 */
	protected initStompClient(token?: string): void {
		this.client = new Client();
		this.client.webSocketFactory = () => new SockJS(`${this.config.brokerURL}`, null, null);
		if (token) {
			this.config.headers['X-AUTH-TOKEN'] = token;
		}
		this.client.connectHeaders = {...this.config.headers, ...{'X-XSRF-TOKEN': this.tokenExtractor.getToken()}};
		this.client.debug = this.debug;
	}

	/**
	 * Perform connection to STOMP broker
	 */
	public connect(token?: string): Observable<StompState> {
		if (this.client && this.client.connected) {
			return this.state;
		}

		this.initStompClient(token);

		this.debug('Connecting...');
		this.state.next(StompState.OPENING);

		this.client.onConnect = () => this.onConnect();
		this.client.onStompError = () => this.onStompError();
		this.client.activate();

		return this.state;
	}

	/**
	 * Disconnect the connection to the STOMP broker and clean up,
	 * not sure how this method will get called, if ever.
	 * Call this method only if you know what you are doing.
	 */
	public disconnect(): void {
		// Disconnect if connected. Callback will set CLOSED state
		if (this.client && this.client.connected) {
			// Notify observers that we are disconnecting!
			this.state.next(StompState.CLOSING);
			this.client.deactivate();
			this.client = null;
			this.state.next(StompState.CLOSED);
		}
	}

	/**
	 * The current connection status with the STOMP broker
	 * @returns boolean
	 */
	public connected(): boolean {
		return this.state.getValue() === StompState.OPEN;
	}

	/**
	 * Send a message to a named destination. The message must be string.
	 *
	 * The message will get locally queued if the STOMP broker is not connected. Attempt
	 * will be made to publish queued messages as soon as the broker gets connected.
	 *
	 * @param queueName
	 * @param message
	 * @param headers
	 */
	public publish(queueName: string, message: string, headers: StompHeaders = {}): void {
		if (this.connected()) {
			this.client.send(queueName, headers, message);
		} else {
			this.debug(`Not connected, queueing ${message}`);
			this.queuedMessages.push({queueName: queueName, message: message, headers: headers});
		}
	}

	/** Send queued messages */
	protected sendQueuedMessages(): void {
		const queuedMessages = this.queuedMessages;
		this.queuedMessages = [];

		this.debug(`Will try sending queued messages ${queuedMessages}`);

		for (const queuedMessage of queuedMessages) {
			this.debug(`Attempting to send ${queuedMessage}`);
			this.publish(queuedMessage.queueName, queuedMessage.message, queuedMessage.headers);
		}
	}

	/**
	 * Subscribe to server message queues
	 *
	 * This method can safely be called even when STOMP broker is not connected. Further
	 * if the underlying STOMP connection drops and reconnects, it will resubscribe transparently.
	 *
	 * If a header field 'ack' is not explicitly passed, 'ack' will be set to 'auto'. If you
	 * do not understand what it means, please leave it as is.
	 *
	 * Please note, however, while working with temporary queues, where the subscription request
	 * creates the underlying queue, during reconnect it might miss messages. This issue is not
	 * specific to this library but the way STOMP brokers are designed to work.
	 *
	 * @param queueName
	 * @param headers
	 * @returns Observable<Stomp.Message>
	 */
	public subscribe(queueName: string, headers: StompHeaders = {}): Observable<Message> {

		/**
		 * Well the logic is complicated but works beautifully. RxJS is indeed wonderful.
		 *
		 * We need to activate the underlying subscription immediately if Stomp is connected. If not it should
		 * subscribe when it gets next connected. Further it should re establish the subscription whenever Stomp
		 * successfully reconnects.
		 *
		 * Actual implementation is simple, we filter the BehaviourSubject 'state' so that we can trigger whenever Stomp is
		 * connected. Since 'state' is a BehaviourSubject, if Stomp is already connected, it will immediately trigger.
		 *
		 * The observable that we return to caller remains same across all reconnects, so no special handling needed at
		 * the message subscriber.
		 */
		this.debug(`Request to subscribe ${queueName}`);

		const coldObservable = new Observable(
			(messages: Observer<Message>) => {
				/** These variables will be used as part of the closure and work their magic during unsubscribe */
				let stompSubscription: StompSubscription;

				let stompConnectedSubscription: Subscription;

				stompConnectedSubscription = this.connectObservable
					.subscribe(() => {
						this.debug(`Will subscribe to ${queueName}`);
						stompSubscription = this.client.subscribe(queueName, (message: Message) => {
								messages.next(message);
							},
							headers);
					});

				return () => {
					/** Cleanup function, will be called when no subscribers are left */
					this.debug(`Stop watching connection state (for ${queueName})`);
					stompConnectedSubscription.unsubscribe();

					if (this.state.getValue() === StompState.OPEN) {
						this.debug(`Will unsubscribe from ${queueName} at Stomp`);
						stompSubscription.unsubscribe();
					} else {
						this.debug(`Stomp not connected, no need to unsubscribe from ${queueName} at Stomp`);
					}
				};
			});

		/**
		 * Important - convert it to hot Observable - otherwise, if the user code subscribes
		 * A long but good explanatory article at https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339
		 */
		return coldObservable.pipe(
			share()
		);
	}

	/**
	 * Callback Functions
	 *
	 * Note the method signature: () => preserves lexical scope
	 * if we need to use this.x inside the function
	 */
	protected debug = (msg: any): void => {
		console.log(new Date(), msg[0].toUpperCase() + msg.substring(1));
	}

	/** Callback run on successfully connecting to server */
	protected onConnect = () => {
		/** Indicate our connected state to observers */
		this.state.next(StompState.OPEN);
	}

	//private _reconnecting = 0;

	/** Handle errors from stomp.js */
	protected onStompError = (error?) => {
		this.debug(`Error: ${error}`);
		/** Check for dropped connection */
		if (this.client && (this.state.getValue() === StompState.OPEN || this.state.getValue() === StompState.OPENING)) {
			this.client.errorCallback = undefined;
			// this.disconnect();
			console.log('STOMP: ' + error);
			this.client = null;
			this.state.next(StompState.FORCE_CLOSED);
			console.log('STOMP: Reconecting in 5 seconds');
			// Reset state indicator
			this.debug('force closed');

			setTimeout(() => this.connect(), 5000);
			//this._reconnecting = 0;
			//const DELAY = this._reconnecting > 5 ?
			//	this._reconnecting > 10 ? 500000 : 10000
			//	: 5000;
			//setTimeout(() => {
			//	this.connect();
			//	this._reconnecting++;
			//	console.error('##RECONNECTING:',this._reconnecting);
			//}, DELAY);
		}
	}
}

