import { makeAutoObservable } from "mobx";
import {
  Bar,
  SubscribeBarsCallback,
  LibrarySymbolInfo,
  ResolutionString,
} from "src/charting_library/charting_library";
import WebSocketStore, {
  IStreamConfig,
  SubscriptionType,
} from "src/state/network/WebSocketHandler";
import dayjs from "dayjs";
import { TerminalRequestMode } from "../shared/TerminalSettingsStore";
import { parseTVChartDuration } from "./parse-utils";

const TRADES_URL = "trades";

export interface EmptyMsg {
  id: number;
  result: null;
}

export interface ISubscriptionHandler {
  callback: SubscribeBarsCallback;
  resolution: ResolutionString;
  lastDailyBar: Bar;
}

export type ChannelToSubscriptionMap = Map<string, ISubscriptionHandler>;

export interface ISettingsProvider {
  party: string;
  botUUID: string;
  pair: string;
  exchange: string;
  requestMode: TerminalRequestMode;
  accountUUID?: string;
  infoAcc?: string;
}

class StreamingStore {
  webSocketState: WebSocketStore;

  settingsProvider: ISettingsProvider;

  subscriptionResolution = new Map<string, ISubscriptionHandler>();

  channelToSubscription = new Map<string, ChannelToSubscriptionMap>();

  lastBarsCache = new Map<string, Bar>();

  private _fetchData: () => Promise<void> = async () => {};

  streamConfig: IStreamConfig | null = null;

  constructor(settings: ISettingsProvider) {
    this.settingsProvider = settings;

    this.webSocketState = new WebSocketStore({
      subCb: [this.updateProcessing],
    });

    makeAutoObservable(this);
  }

  get party() {
    return this.settingsProvider.party;
  }

  get botUUID() {
    return this.settingsProvider.botUUID;
  }

  get pair() {
    return this.settingsProvider.pair;
  }

  get exchange() {
    return this.settingsProvider.exchange;
  }

  get data() {
    return this.webSocketState.data;
  }

  get lockedConnect() {
    return this.webSocketState.socketStatus === 1 || this.webSocketState.socketStatus === 2;
  }

  get requestMode() {
    return this.settingsProvider.requestMode;
  }

  get channelString() {
    return this.pair;
  }

  get infoAcc() {
    return this.settingsProvider.infoAcc;
  }

  getStreamConfig = (type: SubscriptionType, pair: string): IStreamConfig => ({
    type,
    payload: {
      pair,
      exchange: this.exchange,
    },
  });

  // for first connect if WS mode supported
  openConnect = () => {
    if (this.requestMode === "WS") {
      this.setupWebSocket();
    }
  };

  updData = () => {
    switch (this.requestMode) {
      case "FETCH":
        if (!this.settingsProvider.exchange) return;

        this._checkOpenWS();

        this._fetchData();

        break;
      case "WS":
        if (!this.lockedConnect) this.setupWebSocket();
        break;

      default:
        break;
    }
  };

  private _checkOpenWS = () => {
    if (this.webSocketState.socketStatus !== 3 || this.webSocketState.socketStatus !== undefined) {
      this.webSocketState.closeWebSocket();
    }
  };

  setupWebSocket = async () => {
    await this._fetchData();

    this.webSocketState.setupWebSocket({
      url: TRADES_URL,
      config: this.streamConfig || undefined,
    });
  };

  setFetchUpd = (updCb: () => Promise<void>) => {
    this._fetchData = updCb;
  };

  setConfig = (config: IStreamConfig) => {
    this.streamConfig = config;

    // sending configuration to subscribe to the data stream
    if (this.webSocketState.socketStatus === 1) {
      this.webSocketState.sendMsgOnStream(this.streamConfig);
    }
  };

  updateProcessing = () => {
    if (!this.data) return;

    if (this.requestMode === "FETCH") return;
    if (this.data.type !== "trades") return;
    const {
      data: { price, amount, time },
      pair,
    } = this.data.result;

    const tradePrice = parseFloat(price);
    const tradeVolume = parseFloat(amount) * tradePrice;
    const channelString = pair;
    const subscriptionItem = this.channelToSubscription.get(channelString);

    if (subscriptionItem === undefined) {
      return;
    }

    for (const subscription of subscriptionItem.values()) {
      // to build bars based on price only
      const { lastDailyBar } = subscription;
      const lastDailyBarVolume = lastDailyBar.volume;

      const nextDailyBarTime = this.getNextDailyBarTime(lastDailyBar.time, subscription.resolution);

      let bar: Bar;
      if (time >= nextDailyBarTime) {
        // create new bar
        bar = {
          time: nextDailyBarTime,
          open: tradePrice,
          high: tradePrice,
          low: tradePrice,
          close: tradePrice,
          volume: tradeVolume,
        };
      } else {
        // upd last bar
        bar = {
          ...lastDailyBar,
          high: Math.max(lastDailyBar.high, tradePrice),
          low: Math.min(lastDailyBar.low, tradePrice),
          close: tradePrice,
          volume: (lastDailyBarVolume || 0) + tradeVolume,
        };
      }
      subscription.lastDailyBar = bar;
      subscription.callback(bar);
    }
  };

  setLastDailyBar = (channelString: string, resolution: ResolutionString, bar: Bar) => {
    const subscriptionItem = this.channelToSubscription.get(channelString);

    if (subscriptionItem === undefined) {
      return;
    }

    const subscriberUID = `${channelString}_#_${resolution}`;

    const subscriber = subscriptionItem.get(subscriberUID);

    if (subscriber) {
      subscriber.lastDailyBar = bar;

      subscriber.callback(bar);
    }
  };

  getNextDailyBarTime = (barTime: number, resolution: string) => {
    const { duration } = parseTVChartDuration(resolution);

    const durationBar = dayjs.duration(duration);

    const date = dayjs(barTime).add(durationBar);

    return date.valueOf();
  };

  unbindProcessing = () => {
    this.webSocketState.closeWebSocket();
  };

  subscribeOnStream = (
    symbolInfo: LibrarySymbolInfo,
    resolution: ResolutionString,
    onRealtimeCallback: SubscribeBarsCallback,
    subscriberUID: string,
    onResetCacheNeededCallback: () => void,
    lastDailyBar: Bar
  ) => {
    // fix for old strategies where the exchange symbol is used
    const [quote, base] = symbolInfo.full_name.split("_");

    const channelString = `${quote}_${base}`;

    const subHandler: ISubscriptionHandler = {
      callback: onRealtimeCallback,
      resolution,
      lastDailyBar,
    };

    const subscriptionItem = this.channelToSubscription.get(channelString);

    if (subscriptionItem) {
      // Already subscribed to the channel, use the existing subscription

      if (!subscriptionItem.get(subscriberUID)) {
        subscriptionItem.set(subscriberUID, subHandler);

        return;
      }
    }

    const newSubscriptionMap = new Map();
    newSubscriptionMap.set(subscriberUID, subHandler);

    this.channelToSubscription.set(channelString, newSubscriptionMap);

    const config = this.getStreamConfig("subscribe", channelString);

    this.setConfig(config);
  };

  unsubscribeFromStream = (subscriberUID: string) => {
    // Find a subscription with id === subscriberUID
    for (const channelString of this.channelToSubscription.keys()) {
      const subscriptionItem = this.channelToSubscription.get(channelString);

      if (!subscriptionItem) return;

      subscriptionItem.delete(subscriberUID);

      if (subscriptionItem.size === 0) {
        // Unsubscribe from the channel if it is the last handler

        const unsubscribeConfig = this.getStreamConfig("unsubscribe", channelString);

        // unsubscribe from stream if websocket connection is open
        if (this.webSocketState.socketStatus === 1) {
          this.webSocketState.sendMsgOnStream(unsubscribeConfig);
        }

        this.channelToSubscription.delete(channelString);
        break;
      }
    }
  };

  destroy = () => {};
}

export default StreamingStore;
