anomaly-detection-material-parameters-calibration

Sionna param calibration (research proj)
git clone https://git.ea.contact/anomaly-detection-material-parameters-calibration
Log | Files | Refs | README

measurements.py (3180B)


      1 """
      2 This file contains an implementation of a data loader for
      3 the measurement data used from doi: 10.1109/JCS64661.2025.10880629
      4 
      5 The data directory should contain files named in the format:
      6 "Round_<round_idx>_AP_<ap_idx>_RF_<rf_idx>_Sec_<sec_idx>.mat"
      7 where <round_idx>, <ap_idx>, <rf_idx>, and <sec_idx> are integers.
      8 The <round_idx> is the round index, <ap_idx> is the access point index,
      9 <rf_idx> is the RF index, and <sec_idx> is the second index.
     10 
     11 Each file must be loadable with scipy.io.loadmat
     12 and must contain a field named 'cirs' with the shape (512, 1000) each.
     13 """
     14 
     15 import numpy as np
     16 from scipy.io import loadmat as scipy_io_loadmat
     17 
     18 from os import listdir as os_listdir
     19 from os.path import join as os_path_join
     20 from functools import reduce as functools_reduce
     21 from functools import partial as functools_partial
     22 from itertools import product as itertools_product
     23 from multiprocessing import Pool as multiprocessing_Pool
     24 
     25 
     26 def load(
     27     data_dirpath: str,
     28     round_idx: int
     29 ) -> np.ndarray:
     30     """Load the measurements round from the given data directory.
     31     Args:
     32         data_dirpath (str): The path to the directory containing the files.
     33         round (int): The round index.
     34     Returns:
     35         cirs_truth (np.ndarray):
     36             The channel impulse responses.
     37             Shape: (num_aps, num_rfs, num_secs, 512, 1000)
     38     Note:
     39         - The measurements filenames are expected to be in the format:
     40             "Round_<round_idx>_AP_<ap_idx>_RF_<rf_idx>_Sec_<sec_idx>.mat"
     41     """
     42 
     43     # Get the rounds, aps, rfs, secs from the filenames
     44     aps = set()
     45     rfs = set()
     46     secs = {}
     47     for filename in os_listdir(data_dirpath):
     48         if not filename.endswith(".mat") and not filename.startswith("Round_"):
     49             continue
     50         s = filename[6:-4].split("_")
     51         if len(s) != 7:
     52             continue
     53         if int(s[0]) != round_idx:
     54             continue
     55         ap_idx, rf_idx, sec_idx = map(int, (s[2], s[4], s[6]))
     56         aps.add(ap_idx)
     57         rfs.add(rf_idx)
     58         secs[(ap_idx, rf_idx)] = secs.get((ap_idx, rf_idx), set())
     59         secs[(ap_idx, rf_idx)].add(sec_idx)
     60     # Inner join of all available seconds
     61     secs = sorted(functools_reduce(
     62         set.intersection,
     63         (s for s in secs.values())
     64     ))
     65 
     66     # Get num_aps, num_rfs, num_seconds
     67     num_aps = len(aps)
     68     num_rfs = len(rfs)
     69     num_secs = len(secs)
     70 
     71     # Load the measurements
     72     with multiprocessing_Pool() as pool:
     73         func = functools_partial(
     74             load_and_store,
     75             data_dirpath=data_dirpath,
     76             round_idx=round_idx
     77         )
     78         results = pool.map(
     79             func,
     80             itertools_product(aps, rfs, secs)
     81         )
     82     cirs = np.empty(
     83         (num_aps, num_rfs, num_secs, 512, 1000),
     84         dtype=np.complex128
     85     )
     86     for j, k, l, cir in results:
     87         cirs[j, k, l, :] = cir
     88 
     89     return cirs
     90 
     91 
     92 def load_and_store(args, data_dirpath, round_idx):
     93     ap, rf, sec = args
     94     filepath = os_path_join(
     95         data_dirpath,
     96         f"Round_{round_idx}_AP_{ap}_RF_{rf}_Sec_{sec}.mat"
     97     )
     98     cir = scipy_io_loadmat(filepath)['cirs']
     99     assert cir.shape == (512, 1000)
    100     return ap-1, rf, sec-1, cir