import { ensureAsync } from '../shared/utils/utils';
import { concatMap, Observable } from 'rxjs';

const RealtimeSubscription = (realtime, listen) => {
    const subscribers = [];

    const unsubscribe = (name) => {
        const subscriberIndex = subscribers.findIndex((s) => s.name === name);

        if (subscriberIndex > -1) {
            subscribers.splice(subscriberIndex, 1);
        }
    };

    const subscribe = (name, actions, callback) => {
        const subscriber = subscribers.find((s) => s.name === name);

        if (subscriber) {
            subscriber.actions = actions;
            subscriber.callback = callback;
        } else {
            subscribers.push({ name, actions, callback });
        }

        return {
            unsubscribe: () => unsubscribe(name),
        };
    };

    const publish = async (next) => {
        const pubTo = subscribers.filter((s) => s.actions.includes(next.action));

        if (pubTo.length) {
            // ensureAsync "converts" sync functions that throw into
            // async functions that reject, which allSettled handles more gracefully
            // thus supporting both sync and async callbacks while keeping the
            // concept of awaiting allSettled intact
            const promises = pubTo.map((s) => ensureAsync(() => s.callback(next)));
            const settlements = await Promise.allSettled(promises);

            settlements.forEach((s, i) => {
                if (s.status === 'rejected') {
                    console.warn('socket message publish error', {
                        ...s,
                        action: next.action,
                        name: pubTo[i].name,
                    });
                }
            });
        } else {
            console.log('no subscribers for action', next.action);
        }
    };

    const initializeSubscriptionService = async () => {
        await realtime.readyPromise;

        const listener = new Observable(async (sub) => {
            listen((message) => {
                const data = JSON.parse(message.data);
                sub.next(data);
            });
        });

        // piping through concatMap allows us to execute
        // synchronously -> ensure one event propagates and is
        // awaited before publishing the next (can change in the future,
        // but might require some testing and/or components keeping
        // track of their own messageQueues)
        listener.pipe(concatMap((x) => publish(x))).subscribe();
    };

    return {
        initializeSubscriptionService,
        subscribe,
    };
};

export { RealtimeSubscription };
