//#region Imports

import { Observable, OperatorFunction, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

//#endregion

/**
 * Subscribes a observable only once. The subscription will be unsubscribed when the observable is completed.
 *
 * @public
 */
export function subscribeOnce<T>(observable: Observable<T>, next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): { observable: Observable<T> } {
    const subscription = observable.subscribe(next, error, () => {
        if (complete) {
            complete();
        }

        subscription.unsubscribe();
    });

    return { observable };
}

export function cancelPrevious<T>(source$: Observable<T>): OperatorFunction<T, T> {
// Create a subject that will be used to cancel previous requests
    let cancelSubject = new Subject();

    return (source$: Observable<T>) => {
        cancelSubject.next(true); // Cancel previous requests
        cancelSubject.complete(); // Complete the subject
        cancelSubject = new Subject(); // Create a new subject
        return source$.pipe(takeUntil(cancelSubject));
    };
}
