Show pageOld revisionsBacklinksBack to top This page is read only. You can view the source, but not change it. Ask your administrator if you think this is wrong. # RxPy ```python import rx stream = rx.subject.Subject() c = stream.pipe( # ops.take_while(lambda i: i < 60) ops.filter(lambda i: i < 60) ) c.subscribe(on_next=lambda i: print("i is {}".format(i))) stream.on_next(0) stream.on_next(50) stream.on_next(51) stream.on_next(49) stream.on_next(100) stream.on_next(-50) ``` ```python from rx.scheduler import ThreadPoolScheduler import multiprocessing optimal_thread_count = multiprocessing.cpu_count() pool_scheduler = ThreadPoolScheduler(optimal_thread_count + 1) composed = source.pipe( ops.filter(lambda d: not d['IsPlusAD'] and not d['IsPowerClickAD']), # ops.take(1), ops.map_indexed(lambda x, i: (x, i)), ops.delay(0.1, pool_scheduler) ) composed.subscribe(on_next=on_next_data) ``` open/rxpy.txt Last modified: 2024/10/05 06:15by 127.0.0.1