export type ResponseStream<T> = {
  then<TResult1 = T, TResult2 = never>(
    onFulfilled?:
      | ((value: T) => TResult1 | PromiseLike<TResult1>)
      | undefined
      | null,
    onRejected?:
      | ((reason: unknown) => TResult2 | PromiseLike<TResult2>)
      | undefined
      | null,
    onProgress?: (status: { progress: number }) => void,
  ): Promise<TResult1 | TResult2>;

  catch<TResult = never>(
    onRejected?:
      | ((reason: unknown) => TResult | PromiseLike<TResult>)
      | undefined
      | null,
  ): Promise<T | TResult>;

  // ensure backwards compatibility with legacy progress integrations
  progress(
    onProgress?: (status: { progress: number }) => void,
  ): ResponseStream<T>;

  // ensure backwards compatibility with legacy progress integrations
  on(
    evt: "progress",
    handler: (status: { progress: number }) => void,
  ): ResponseStream<T>;
};

export function runExportService<T>(
  request: Request | Promise<Request>,
): ResponseStream<T> {
  //
  // STEP: prepare the responseStream external interface
  //
  const progressListeners = new Set<(progress: number) => void>();
  const responseStream = Object.freeze<ResponseStream<T>>({
    then(onFulfilled, onRejected, onProgress) {
      if (onProgress) {
        progressListeners.add((progress) =>
          onProgress({ progress: progress * 100 }),
        );
      }

      return promise.then(onFulfilled, onRejected);
    },

    catch(onRejected) {
      return promise.catch(onRejected);
    },

    progress(onProgress): ResponseStream<T> {
      if (onProgress) {
        progressListeners.add((progress) =>
          onProgress({ progress: progress * 100 }),
        );
      }

      return responseStream;
    },

    on(evt, onProgress): ResponseStream<T> {
      progressListeners.add((progress) =>
        onProgress({ progress: progress * 100 }),
      );
      return responseStream;
    },
  });

  const notify = (progress: number) => {
    progressListeners.forEach((listener) => listener(progress));
  };

  //
  // STEP: run the request and create the actual promise object, which will
  // handle fulfillment of the process
  //

  const promise = Promise.resolve(request).then(
    (request) =>
      new Promise<T>((resolve, reject) => {
        fetch(request)
          .then(async (response) => {
            if (!response.ok) {
              throw new Error(
                `runExportService(): fetch failed: ${response.status} ${response.statusText}`,
              );
            }

            if (!response.body) {
              throw new Error(
                "runExportService(): fetch failed to provide a body stream",
              );
            }

            for await (const payload of yieldPayloadsFromNDJSON(
              response.body,
            )) {
              if (typeof payload !== "object" || payload == null) {
                continue;
              }

              if (
                "progress" in payload &&
                typeof payload["progress"] === "number"
              ) {
                notify(payload["progress"]);
                continue;
              }

              if ("output" in payload) {
                resolve(payload["output"] as T);
                return;
              }

              if ("error" in payload && payload["error"] === true) {
                throw new Error("internal server error");
              }
            }

            // we've reached the end of the response stream, but we never got to
            // extract the desired output URL - throw an error
            throw new Error(
              "runExportService(): reached end of reader stream, without receiving expected output",
            );
          })
          .catch((err) => {
            reject(err);
          });
      }),
  );

  return responseStream;
}

/**
 * utility that'll stream the response from a fetch request, and yield the
 * individual payloads as parsed JSON-objects
 */
async function* yieldPayloadsFromNDJSON(
  stream: ReadableStream<Uint8Array>,
): AsyncGenerator<unknown, void> {
  const reader = stream.pipeThrough(new TextDecoderStream()).getReader();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();

    if (value !== undefined) {
      buffer += value;

      // iterate over all complete payloads by splitting the response when
      // hitting newlines, and the parse and yield each one
      let delimeterIndex: number;
      while ((delimeterIndex = buffer.indexOf("\n")) !== -1) {
        yield JSON.parse(buffer.substring(0, delimeterIndex));

        // remove the payload that we just yielded from the buffer, so it's not
        // yielded again
        buffer = buffer.substring(delimeterIndex + 1);
      }
    }

    if (done) {
      break;
    }
  }

  if (buffer) {
    // yield the final payload, if it wasn't wrapped with a newline
    yield JSON.parse(buffer);
  }
}
