from rx.scheduler import ThreadPoolScheduler import multiprocessing optimal_thread_count = multiprocessing.cpu_count() pool_scheduler = ThreadPoolScheduler(optimal_thread_count + 1) composed = source.pipe( ops.filter(lambda d: not d['IsPlusAD'] and not d['IsPowerClickAD']), # ops.take(1), ops.map_indexed(lambda x, i: (x, i)), ops.delay(0.1, pool_scheduler) ) composed.subscribe(on_next=on_next_data)