141 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| import { async } from '../scheduler/async';
 | |
| import { Subscriber } from '../Subscriber';
 | |
| import { isScheduler } from '../util/isScheduler';
 | |
| export function bufferTime(bufferTimeSpan) {
 | |
|     let length = arguments.length;
 | |
|     let scheduler = async;
 | |
|     if (isScheduler(arguments[arguments.length - 1])) {
 | |
|         scheduler = arguments[arguments.length - 1];
 | |
|         length--;
 | |
|     }
 | |
|     let bufferCreationInterval = null;
 | |
|     if (length >= 2) {
 | |
|         bufferCreationInterval = arguments[1];
 | |
|     }
 | |
|     let maxBufferSize = Number.POSITIVE_INFINITY;
 | |
|     if (length >= 3) {
 | |
|         maxBufferSize = arguments[2];
 | |
|     }
 | |
|     return function bufferTimeOperatorFunction(source) {
 | |
|         return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
 | |
|     };
 | |
| }
 | |
| class BufferTimeOperator {
 | |
|     constructor(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
 | |
|         this.bufferTimeSpan = bufferTimeSpan;
 | |
|         this.bufferCreationInterval = bufferCreationInterval;
 | |
|         this.maxBufferSize = maxBufferSize;
 | |
|         this.scheduler = scheduler;
 | |
|     }
 | |
|     call(subscriber, source) {
 | |
|         return source.subscribe(new BufferTimeSubscriber(subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler));
 | |
|     }
 | |
| }
 | |
| class Context {
 | |
|     constructor() {
 | |
|         this.buffer = [];
 | |
|     }
 | |
| }
 | |
| class BufferTimeSubscriber extends Subscriber {
 | |
|     constructor(destination, bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
 | |
|         super(destination);
 | |
|         this.bufferTimeSpan = bufferTimeSpan;
 | |
|         this.bufferCreationInterval = bufferCreationInterval;
 | |
|         this.maxBufferSize = maxBufferSize;
 | |
|         this.scheduler = scheduler;
 | |
|         this.contexts = [];
 | |
|         const context = this.openContext();
 | |
|         this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
 | |
|         if (this.timespanOnly) {
 | |
|             const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
 | |
|             this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
 | |
|         }
 | |
|         else {
 | |
|             const closeState = { subscriber: this, context };
 | |
|             const creationState = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
 | |
|             this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
 | |
|             this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
 | |
|         }
 | |
|     }
 | |
|     _next(value) {
 | |
|         const contexts = this.contexts;
 | |
|         const len = contexts.length;
 | |
|         let filledBufferContext;
 | |
|         for (let i = 0; i < len; i++) {
 | |
|             const context = contexts[i];
 | |
|             const buffer = context.buffer;
 | |
|             buffer.push(value);
 | |
|             if (buffer.length == this.maxBufferSize) {
 | |
|                 filledBufferContext = context;
 | |
|             }
 | |
|         }
 | |
|         if (filledBufferContext) {
 | |
|             this.onBufferFull(filledBufferContext);
 | |
|         }
 | |
|     }
 | |
|     _error(err) {
 | |
|         this.contexts.length = 0;
 | |
|         super._error(err);
 | |
|     }
 | |
|     _complete() {
 | |
|         const { contexts, destination } = this;
 | |
|         while (contexts.length > 0) {
 | |
|             const context = contexts.shift();
 | |
|             destination.next(context.buffer);
 | |
|         }
 | |
|         super._complete();
 | |
|     }
 | |
|     _unsubscribe() {
 | |
|         this.contexts = null;
 | |
|     }
 | |
|     onBufferFull(context) {
 | |
|         this.closeContext(context);
 | |
|         const closeAction = context.closeAction;
 | |
|         closeAction.unsubscribe();
 | |
|         this.remove(closeAction);
 | |
|         if (!this.closed && this.timespanOnly) {
 | |
|             context = this.openContext();
 | |
|             const bufferTimeSpan = this.bufferTimeSpan;
 | |
|             const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
 | |
|             this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
 | |
|         }
 | |
|     }
 | |
|     openContext() {
 | |
|         const context = new Context();
 | |
|         this.contexts.push(context);
 | |
|         return context;
 | |
|     }
 | |
|     closeContext(context) {
 | |
|         this.destination.next(context.buffer);
 | |
|         const contexts = this.contexts;
 | |
|         const spliceIndex = contexts ? contexts.indexOf(context) : -1;
 | |
|         if (spliceIndex >= 0) {
 | |
|             contexts.splice(contexts.indexOf(context), 1);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| function dispatchBufferTimeSpanOnly(state) {
 | |
|     const subscriber = state.subscriber;
 | |
|     const prevContext = state.context;
 | |
|     if (prevContext) {
 | |
|         subscriber.closeContext(prevContext);
 | |
|     }
 | |
|     if (!subscriber.closed) {
 | |
|         state.context = subscriber.openContext();
 | |
|         state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
 | |
|     }
 | |
| }
 | |
| function dispatchBufferCreation(state) {
 | |
|     const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
 | |
|     const context = subscriber.openContext();
 | |
|     const action = this;
 | |
|     if (!subscriber.closed) {
 | |
|         subscriber.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
 | |
|         action.schedule(state, bufferCreationInterval);
 | |
|     }
 | |
| }
 | |
| function dispatchBufferClose(arg) {
 | |
|     const { subscriber, context } = arg;
 | |
|     subscriber.closeContext(context);
 | |
| }
 | |
| //# sourceMappingURL=bufferTime.js.map
 | 
