import _ from "lodash";
import { channel, Channel, buffers } from "redux-saga";
import { call, cancelled, delay, put, race, take } from "redux-saga/effects";
import { stopEvaluation } from "legacy/actions/evaluationActions";
import { DataTreeFactory } from "legacy/entities/DataTree/dataTreeFactory";
import log from "utils/logger";
import type { default as WebpackWorker } from "worker-loader!";

const EVAL_STOP_MESSAGE = "User navigated away during evaluation";

type GenReturn<R> = Generator<any, R, any>;

/**
 * Wrap a webworker to provide a synchronous request-response semantic.
 *
 * Usage on main thread:
 * w = GracefulWorkerService(Worker);
 * yield w.start(); // Start the worker
 * const workerResponse = yield w.request("my_action", { hello: "world" }); // Send a request, wait for response
 *
 * Worker will receive:
 * {
 *   method: "my_action",
 *   requestId: "<unique request id>",
 *   requestData: { hello: "world" },
 * }
 *
 * Worker is expected to respond with an object with exactly the `requestId`, `timeTaken` and `responseData` keys:
 * {
 *   requestId: "<the id it received>",
 *   responseData: 42,
 *   timeTaken: 23.33,
 * }
 * All other keys will be ignored.
 * We make no assumptions about data type of `requestData` or `responseData`.
 *
 * Note: The worker will hold ALL requests, even in case of restarts.
 * If we do not want that behaviour, we should create a new GracefulWorkerService.
 */
// TODO: Add a compatible listener layer on the worker to complete the framework.
// TODO: Extract the worker wrapper into a library to be useful to anyone with WebWorkers + redux-saga.
// TODO: Add support for timeouts on requests and shutdown.
// TODO: Add a readiness + liveness probes.
export class GracefulWorkerService {
  // We keep track of all in-flight requests with these channels.
  private readonly _channels: Map<string, Channel<any>>;
  // The class of the WebWorker, because we can't re-import multiple times
  private _workerClass: (Worker & { new (): Worker }) | undefined;
  // The actual WebWorker
  private _evaluationWorker: WebpackWorker | undefined;

  // Channels in redux-saga are NOT like signals.
  // They operate in `pulse` mode of a signal. But `readiness` is more like a continuous signal.
  // This variable provides the equivalent of the `hold` state signal.
  // If isReady is false, wait on `this._readyChan` to get the pulse signal.
  private _isReady: boolean;
  // Channel to signal all waiters that we're ready. Always use it with `this._isReady`.
  private readonly _readyChan: Channel<any>;

  private readonly _workerLoader: () => Promise<{
    default: typeof WebpackWorker;
  }>;

  constructor({
    workerLoader,
  }: {
    workerLoader: () => Promise<{ default: typeof WebpackWorker }>;
  }) {
    this.shutdown = this.shutdown.bind(this);
    this.start = this.start.bind(this);
    this.request = this.request.bind(this);
    this.requestWithMeta = this.requestWithMeta.bind(this);
    this.ready = this.ready.bind(this);
    this._broker = this._broker.bind(this);

    // Do not buffer messages on this channel
    this._readyChan = channel(buffers.none());
    this._isReady = false;
    this._channels = new Map<string, Channel<any>>();
    this._workerLoader = workerLoader;
  }

  /**
   * Start a new worker and registers our broker.
   * Note: If the worker is already running, this is a no-op
   */
  *start(): Generator<any, any, any> {
    if (this._isReady || this._evaluationWorker) return;

    if (!this._workerClass) {
      const module = yield this._workerLoader();
      this._workerClass = module.default;
    }

    const worker = new this._workerClass!();
    worker.addEventListener("message", this._broker);
    this._evaluationWorker = worker;
    // Inform all pending requests that we're good to go!
    this._isReady = true;
    yield put(this._readyChan, true);
  }

  /**
   * Gracefully shutdown the worker.
   * Note: If the worker is already stopped / shutting down, this is a no-op
   */
  *shutdown() {
    // When the worker shuts down it loses its copy of the data tree
    // and forces it to use the full data tree on next startup
    DataTreeFactory.clearCachedDataTree();

    if (!this._isReady) return;
    // stop accepting new requests
    this._isReady = false;
    // wait for current responses to drain, check every 10 milliseconds
    while (this._channels.size > 0) {
      yield delay(10);
    }
    // close the worker
    if (!this._evaluationWorker) return;
    this._evaluationWorker.removeEventListener("message", this._broker);
    this._evaluationWorker.terminate();
    this._evaluationWorker = undefined;
  }

  /**
   * Check if the worker is ready, optionally block on it.
   */
  *ready(block = false) {
    if (this._isReady && this._evaluationWorker) return true;
    if (block) {
      yield take(this._readyChan);
      return true;
    }
    return false;
  }

  *requestWithMeta(
    method: string,
    requestData: any,
  ): GenReturn<{ responseData: any; timeInWorker: string } | undefined> {
    yield call(this.ready, true);
    if (!this._evaluationWorker) console.log("[] skipping request", method);
    // Impossible case, but helps avoid `?` later in code and makes it clearer.
    if (!this._evaluationWorker) throw new Error("Worker not ready");

    /**
     * We create a unique channel to wait for a response of this specific request.
     */
    const requestId = `${method}__${_.uniqueId()}`;
    const ch = channel();
    this._channels.set(requestId, ch);
    const mainThreadStartTime = performance.now();
    let timeTaken;

    try {
      this._evaluationWorker.postMessage({
        method,
        requestData: requestData ?? {},
        requestId,
      });
      // The `this._broker` method is listening to events and will pass response to us over this channel.
      const { response, stopped } = yield race({
        response: take(ch),
        stopped: take(stopEvaluation.type),
      });
      if (stopped) {
        // This case happens if the user navigates away during evaluation
        throw new Error(EVAL_STOP_MESSAGE);
      }
      timeTaken = response.timeTaken;
      const { responseData } = response;
      return { responseData, timeInWorker: timeTaken };
    } catch (e: any) {
      if (e.message !== EVAL_STOP_MESSAGE) {
        log.error(`Failed to postMessage ${e.message}`);
      }
    } finally {
      // Log perf of main thread and worker
      const mainThreadEndTime = performance.now();
      const timeTakenOnMainThread = mainThreadEndTime - mainThreadStartTime;
      if (yield cancelled()) {
        log.debug(
          `Main ${method} cancelled in ${timeTakenOnMainThread.toFixed(2)}ms`,
        );
      } else {
        log.debug(`Main ${method} took ${timeTakenOnMainThread.toFixed(2)}ms`);
      }

      if (timeTaken) {
        const transferTime = timeTakenOnMainThread - Number(timeTaken);
        log.debug(`Worker ${method} took ${timeTaken}ms`);
        log.debug(`Transfer ${method} took ${transferTime.toFixed(2)}ms`);
      }
      // Cleanup
      yield ch.close();
      this._channels.delete(requestId);
    }
  }

  /**
   * Send a request to the worker for processing.
   * If the worker isn't ready, we wait for it to become ready.
   *
   * @param method identifier for a rpc method
   * @param requestData data that we want to send over to the worker
   *
   * @returns response from the worker
   */
  *request(method: string, requestData = {}): any {
    const { responseData } = yield call(
      this.requestWithMeta,
      method,
      requestData,
    );
    return responseData;
  }

  private _broker(event: MessageEvent) {
    if (!event || !event.data) {
      return;
    }
    const { requestId, responseData, timeTaken } = event.data;
    const ch = this._channels.get(requestId);
    // Channel could have been deleted if the request gets cancelled before the WebWorker can respond.
    // In that case, we want to drop the request.
    if (ch) {
      ch.put({ responseData, timeTaken });
    }
  }
}
