import { apiManager, FpApi } from "@tcs-rliess/fp-core";
import { fpLog } from "@tcs-rliess/fp-log";
import autobind from "autobind-decorator";
import { EventEmitter } from "events";
import { castArray } from "lodash-es";
import { DateTime } from "luxon";
import { action, observable, when } from "mobx";
import * as mqtt from "mqtt";

import { FleetplanApp } from "../../FleetplanApp";

type clientStateType = "offline" | "connecting" | "online";

export type MqttListener<T = any> = (topic: string, message: T) => void;

export interface MqttMessageStoreItem {
	time: DateTime;
	topic: string;
	message: string;
}

export class Mqtt extends EventEmitter {
	@observable public clientState: clientStateType = "offline";
	@observable public subscribedTopics: Map<string, Set<MqttListener>> = new Map();

	/** we keep a few messages around for the dev bar */
	@observable messages: MqttMessageStoreItem[] = [];

	private client: mqtt.Client;
	private retries = 0;
	private get prefix(): string { return `fp/v0/dscid/${this.app.dscid}`; }
	private authData: FpApi.Security.MqttAuthInfo;

	private app: FleetplanApp;
	private authenticationService = apiManager.getService(FpApi.Security.AuthenticationService);

	constructor(app: FleetplanApp) {
		super();
		this.app = app;
	}

	/**
	 * Open the MQTT Client. It may be be imminently connected, but can be fully used after the first `open()` call.
	 */
	@autobind
	public async connect(): Promise<void> {
		if (this.client?.connected === true) {
			fpLog.log("[MQTT] already connected");
			return;
		}

		if (this.app.ctx?.isAuthorized === false) throw new Error("Can't connect MQTT, needs to be logged in.");
		if (this.app.ctx?.dscaid == null) throw new Error("Can't connect MQTT, needs dscaid.");

		fpLog.log("[MQTT] connecting...");
		this.setClientState("connecting");

		// update auth data
		if (this.authData == null || new Date(this.authData.expireAt) < new Date()) {
			try {
				fpLog.log("[MQTT] fetching auth...");
				this.authData = await this.authenticationService.mqttAuth(this.app.ctx);

				if (this.client) {
					this.client.options.password = this.authData.user;
					this.client.options.password = this.authData.password;
				}
			} catch (err) {
				fpLog.error("[MQTT] Couldn't fetch authData:", err);
				if (err.code === "UNAUTHORIZED_RPC_CALL") {
					// likely logged out
					fpLog.error("[MQTT] UNAUTHORIZED_RPC_CALL, reloading...");

					if (window.loginToken) {
						window.location.assign("https://" + window.location.host + "/#/loginToken/restrictedUser/" + encodeURIComponent(window.loginToken));
						window.location.reload();
					} else {
						window.location.reload();
					}
				} else {
					this.retries++;
					this.reconnectWithBackOff();
				}
				return;
			}
		}

		if (this.client == null) {
			this.client = mqtt.connect(`wss://${window.location.host}/mqtt`, {
				username: this.authData.user,
				password: this.authData.password,

				// turn off reconnectPeriod - we have our own exponential back off
				reconnectPeriod: 0,
				// random keep alive between 15 and 29 seconds (30 seconds is default max for mqtt)
				keepalive: Math.round(Math.random() * 14 + 15),

				// make ud go offline if we loose the connection
				will: {
					topic: `${this.prefix}/presence/${this.app.ctx.dscaid}`,
					retain: true,
					payload: "", // sending an empty payload with retain true will delete the message from the retained messages store
					qos: 0,
				},
			});

			this.client.on("connect", () => {
				this.setClientState("online");
				this.retries = 0;

				const topics = Array.from(this.subscribedTopics.keys());
				Promise.all(topics.map(topic => {
					fpLog.info(`[MQTT] resubbing ${topic}`);
					return this.subscribeInternal(topic);
				})).then(() => {
					fpLog.info("[MQTT] connected");
					this.emit("reconnect");
				}).catch(e => {
					fpLog.error("[MQTT] connect error");
				});
			});

			this.client.on("error", error => {
				fpLog.error("[MQTT] Error", error);
			});

			this.client.on("close", () => {
				// if set to offline, don't reconnect
				if (this.clientState === "offline") {
					fpLog.log("[MQTT] connection closed");
					return;
				}

				// try to reconnect
				this.setClientState("connecting");
				this.retries++;
				this.reconnectWithBackOff();
			});

			this.client.on("message", this.handleMessage);
		} else {
			this.client.options.username = this.authData.user;
			this.client.options.password = this.authData.password;
			this.client.reconnect();
		}
	}

	@autobind
	public publish(topic: string, message: string, options: mqtt.IClientPublishOptions = {}, cb?: mqtt.PacketCallback): void {
		when(() => this.client != null, () => {
			this.client.publish(`${this.prefix}${topic}`, message, options, cb);
		});
	}

	@autobind
	private handleMessage(topic: string, payload: Buffer): void {
		const stringifiedPayload = payload.toString();
		topic = topic.substr(this.prefix.length);

		// this is the history displayed in the dev bar
		this.messages.unshift({
			time: DateTime.utc(),
			topic: topic,
			message: stringifiedPayload,
		});
		if (this.messages.length > 200) this.messages.length = 200;

		// listeners
		const wildcardListeners = Array
			.from(this.subscribedTopics)
			.filter(el => el[0].includes("#") && topic.startsWith(el[0].replace("#", "")))
			.reduce((previous, current) => {
				previous.push(...Array.from(current[1]));
				return previous;
			}, [] as MqttListener[]);

		const listeners = [
			...Array.from(this.subscribedTopics.get(topic) || []),
			...wildcardListeners,
		];

		if (listeners.length === 0) {
			fpLog.warn(`[MQTT] no listeners for ${topic}`);
		}

		try {
			/* It can happen, that stringifiedPayload is an empty string */
			if(stringifiedPayload) {
				const message = JSON.parse(stringifiedPayload);
				listeners.forEach((listener) => {
					listener(topic, message);
				});
			}
		} catch (err) {
			listeners.forEach((listener) => {
				listener(topic, stringifiedPayload);
			});
			fpLog.error("[MQTT] MQTT listener Error", { topic, payload: payload.toString(), err });
		}
	}

	@action.bound
	private setClientState(state: clientStateType): void {
		this.clientState = state;
	}

	@autobind
	private reconnectWithBackOff(): void {
		const time = Math.min(
			// retry after:
			//  2 =>  1s
			//  4 =>  2s
			//  8 =>  4s
			// 16 =>  8s
			// 32 => 16s
			// 64 => 31s
			Math.pow(2, this.retries) * 500 + Math.round(Math.random() * 1000),
			// but at least every 30s
			Math.round((30 + Math.random()) * 1000)
		);

		fpLog.log(`[MQTT] reconnecting in ${time}ms (retry ${this.retries})`);

		setTimeout(() => {
			this.connect().catch(fpLog.error);
		}, time);
	}

	@autobind
	public async subscribe(topics: string | string[], listener?: MqttListener): Promise<void> {
		topics = castArray(topics);
		for (const topic of topics) {
			if (this.subscribedTopics.has(topic)) {
				this.subscribedTopics.set(topic, this.subscribedTopics.get(topic).add(listener));
			} else {
				this.subscribedTopics.set(topic, new Set<MqttListener>().add(listener));
				return this.subscribeInternal(topic);
			}
		}
	}

	@autobind
	public unsubscribe(topics: string | string[], listener: MqttListener): void {
		topics = castArray(topics);
		for (const topic of topics) {
			const set = this.subscribedTopics.get(topic);
			if(!set) return;
			set.delete(listener);

			if (set.size === 0) {
				this.subscribedTopics.delete(topic);
				this.unsubscribeInternal(topic);
			}
		}
	}

	@autobind
	private subscribeInternal(topic: string): Promise<void> {
		if (this.client == null) return Promise.resolve();

		return new Promise((resolve, reject) => {
			// fpLog.log(`[MQTT] subscribe: ${topic}`);
			this.client.subscribe(`${this.prefix}${topic}`, (err, resp) => {
				if (err) {
					fpLog.debug(`[MQTT] subscribe error: ${topic}`);
					setTimeout(() => {
						this.subscribeInternal(topic).then(resolve).catch(reject);
					}, Math.min(Math.random() * 1000, 500));
				} else {
					// fpLog.debug(`[MQTT] subscribe success: ${topic}`);
					resolve();
				}
			});
		});
	}

	@autobind
	private unsubscribeInternal(topic: string): void {
		if (this.client == null) return;

		// fpLog.log(`[MQTT] unsubscribe: ${topic}`);
		this.client.unsubscribe(`${this.prefix}${topic}`, (err, resp) => {
			if (err) {
				fpLog.debug(`[MQTT] unsubscribe error: ${topic}`);
				setTimeout(() => {
					this.unsubscribeInternal(topic);
				}, Math.min(Math.random() * 1000, 500));
			} else {
				// fpLog.debug(`[MQTT] unsubscribe success: ${topic}`, resp);
			}
		});
	}
}
