import numpy as np import matplotlib.pyplot as plt from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import time import os needs_update = True # flag global def plot_constellations(ref_file, rx_file, title="Constellation Comparison", save_path=None): ref_data = np.atleast_2d(np.loadtxt(ref_file)) rx_data = np.atleast_2d(np.loadtxt(rx_file)) x_ref, y_ref = ref_data[:,0], ref_data[:,1] x_rx, y_rx = rx_data[:,0], rx_data[:,1] plt.clf() # efface la figure précédente plt.scatter(x_ref, y_ref, color='dodgerblue', s=80, marker='o', edgecolors='k', label='Référence') plt.scatter(x_rx, y_rx, color='tomato', s=80, marker='x', alpha=0.6, label='Reçu') all_x = np.concatenate([x_ref, x_rx]) all_y = np.concatenate([y_ref, y_rx]) margin = 0.15 * max(np.ptp(all_x), np.ptp(all_y)) plt.xlim(min(all_x)-margin, max(all_x)+margin) plt.ylim(min(all_y)-margin, max(all_y)+margin) plt.grid(True, which='both', linestyle='--', linewidth=0.5, alpha=0.7) plt.axhline(0, color='black', linewidth=1) plt.axvline(0, color='black', linewidth=1) plt.xlabel('In-phase (I)', fontsize=12) plt.ylabel('Quadrature (Q)', fontsize=12) plt.title(title, fontsize=14, fontweight='bold') plt.gca().set_aspect('equal', adjustable='box') plt.legend() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') plt.pause(0.1) class FileChangeHandler(FileSystemEventHandler): def on_modified(self, event): global needs_update if event.src_path.endswith("constellation_ref.dat") or event.src_path.endswith("constellation.dat"): print(f"{event.src_path} modifié") needs_update = True # on ne fait que signaler if __name__ == "__main__": ref_file = "constellation_ref.dat" rx_file = "constellation.dat" plt.ion() plt.figure(figsize=(7,7)) plot_constellations(ref_file, rx_file) event_handler = FileChangeHandler() observer = Observer() observer.schedule(event_handler, path=os.path.dirname(os.path.abspath(ref_file)) or '.', recursive=False) observer.start() try: while True: if needs_update: plot_constellations(ref_file, rx_file) needs_update = False time.sleep(0.2) # boucle principale except KeyboardInterrupt: observer.stop() observer.join()