Estoy buscando la forma más rápida (en términos de latencia) de comunicar entre dos procesos el hecho de que se ha producido un evento.
Para ser más precisos, tengo una matriz numpy en una memoria compartida donde un proceso (productor) escribe actualizaciones en una matriz y otro (consumidor) las lee.
Se requiere el uso de multiprocesamiento porque necesitamos superar GIL. Producer es un proceso pesado de CPU/IO que escucha el flujo de datos y realiza algún procesamiento de datos.
Consumer es un proceso muy ligero y en su mayoría inactivo, pero debemos despertarlo lo más rápido posible cuando Producer actualiza una matriz.
Una cosa más. Es más importante activar al consumidor con una latencia mínima que transferir todos los mensajes. (Por ejemplo, en caso de que el productor envíe tres mensajes seguidos sin demora y el consumidor reciba solo el primero y pierda los dos siguientes, está bien).
He probado primitivas de multiprocesamiento Pipe, Queue, Event para este propósito, parece que son casi iguales en términos de latencia. La tubería es la más estable.
import multiprocessing as mp import numpy as np import random import time ITER_COUNT = 1000 def get_mcs_diff(ts): return round((time.time() - ts) * 1e6, 0) def main(v, input_pipe): for _ in range(ITER_COUNT): v.value = time.time() input_pipe.send(None) time.sleep((0.1 + random.random()) / 100) if __name__ == "__main__": v = mp.Value('d', time.time()) (ip, op) = mp.Pipe() p = mp.Process(target=main, args=(v, ip,)) measurements = [] p.start() i = 0 while i < ITER_COUNT: op.recv() measurements.append(get_mcs_diff(v.value)) i += 1 print(np.percentile(measurements, [50, 90, 95, 99], axis=0)) p.join()
# Output # 50, 90, 95, 99 percentiles in microseconds # > [138. 206.1 238. 383.21]
import multiprocessing as mp import numpy as np import random import time ITER_COUNT = 1000 def get_mcs_diff(ts): return round((time.time() - ts) * 1e6, 0) def main(v, q): for _ in range(ITER_COUNT): v.value = time.time() q.put(None) time.sleep((0.1 + random.random()) / 100) if __name__ == "__main__": v = mp.Value('d', time.time()) q = mp.Queue() p = mp.Process(target=main, args=(v, q,)) measurments = [] p.start() i = 0 while i < ITER_COUNT: q.get() measurments.append(get_mcs_diff(v.value)) i += 1 print(measurments) print(np.percentile(measurments, [50, 90, 95, 99], axis=0)) p.join()
# Output # 50, 90, 95, 99 percentiles in microseconds # > [187. 266. 299.05 444.06]
import multiprocessing as mp import numpy as np import random import time ITER_COUNT = 1000 def get_mcs_diff(ts): return round((time.time() - ts) * 1e6, 0) def main(v, e): for _ in range(ITER_COUNT): v.value = time.time() e.set() time.sleep((0.1 + random.random()) / 100) if __name__ == "__main__": v = mp.Value('d', time.time()) e = mp.Event() p = mp.Process(target=main, args=(v, e,)) measurments = [] p.start() i = 0 while i < ITER_COUNT: e.wait() measurments.append(get_mcs_diff(v.value)) i += 1 e.clear() print(np.percentile(measurments, [50, 90, 95, 99], axis=0)) p.join()
# Output # 50, 90, 95, 99 percentiles in microseconds # > [142. 222.1 256.05 1754.77]
import multiprocessing as mp import numpy as np import random import time ITER_COUNT = 1000 def get_mcs_diff(ts): return round((time.time() - ts) * 1e6, 0) def main(v): time.sleep(1) for _ in range(ITER_COUNT): v.value = time.time() # print(v.value) time.sleep((0.1 + random.random()) / 100) if __name__ == "__main__": v = mp.Value('d', time.time()) p = mp.Process(target=main, args=(v,)) measurments = [] p.start() i = 0 v_prev = 0 while i < ITER_COUNT: # print(v_prev - v.value) if v_prev < v.value: measurments.append(get_mcs_diff(v.value)) v_prev = float(v.value) i += 1 print(np.percentile(measurments, [50, 90, 95, 99], axis=0)) p.join()
# Output # 50, 90, 95, 99 percentiles in microseconds # > [ 33. 65. 81. 128.05]
Hasta ahora, el bucle ocupado es la opción más rápida. Pero me gustaría evitarlo por una razón obvia.