import { AsyncSink } from "ix";
import _ from "lodash";
import { ListenOnce, listenOnce } from "./ListenOnce";
import { PendingData } from "./PendingData";

export type PendingDataStream = AsyncGenerator<PendingData, void>;

export interface LocalDataSocket {
  /** open a websocket listener server */
  listen: () => void;

  /** stop the listening server */
  close: () => void;

  /** return true if listening for websocket connections */
  listening: () => boolean;

  /** Stream of multiple data transactions.
   *
   * Within each data transaction, the header is sent
   * separately, followed by 0 or more data blocks, followed by complete.
   *  (header, data*, complete?)*
   *
   * Most errors (e.g. from network disconnects or server restarts) are handled internally and
   * not reported.
   */
  dataStreams: AsyncGenerator<PendingDataStream, void>;
}

export interface SocketListenerParams {
  makeWebSocket: (url: string) => WebSocket;
  port?: number;
  reconnectDelay?: number;
}

export interface DashboardInfo {
  currentDashboard?: string;
  browserId?: string;
}

/** Return a server that can listen on a websocket port for data pipe connections */
export function socketListener(params: SocketListenerParams): LocalDataSocket {
  let closed = true;
  const listeners: ListenOnce[] = [];
  const { reconnectDelay = 500 } = params;
  const streamsSink = new AsyncSink<AsyncGenerator<PendingData, void>>();

  function listen(): void {
    closed = false;
    if (listeners.length === 0) {
      anotherListener();
    } else {
      console.error(
        "ignoring attempt to listen(). We already have an active web socket server."
      );
    }
  }

  function anotherListener(): void {
    if (!closed) {
      streamsSink.write(listenerProxy());
    }
  }

  async function* listenerProxy(): AsyncGenerator<PendingData, void> {
    const oneListen = listenOnce(params);
    listeners.push(oneListen);
    try {
      for await (const next of oneListen.pending) {
        if (next.kind === "header") {
          anotherListener();
        }
        yield next;
      }
    } finally {
      _.remove(listeners, (l) => l === oneListen);
      if (listeners.length < 1) {
        setTimeout(anotherListener, reconnectDelay);
      }
    }
  }

  function listening(): boolean {
    return !closed;
  }

  function close(): void {
    closed = true;
    listeners.forEach((l) => l.close());
    listeners.length = 0;
  }

  async function* publicStreams(): AsyncGenerator<PendingDataStream, void> {
    for await (const next of streamsSink) {
      yield next;
    }
  }

  return {
    listen,
    close,
    listening,
    dataStreams: publicStreams(),
  };
}
