anomaly-detection-material-parameters-calibration

Sionna param calibration (research proj)
git clone https://git.ea.contact/anomaly-detection-material-parameters-calibration
Log | Files | Refs | README

system_level_channel.py (16277B)


      1 #
      2 # SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
      3 # SPDX-License-Identifier: Apache-2.0
      4 #
      5 """Base class for implementing system level channel models from 3GPP TR38.901
      6 specification"""
      7 
      8 import tensorflow as tf
      9 import numpy as np
     10 import matplotlib.pyplot as plt
     11 
     12 from . import LSPGenerator
     13 from . import RaysGenerator
     14 from . import Topology, ChannelCoefficientsGenerator
     15 from sionna.channel import ChannelModel
     16 from sionna.channel.utils import deg_2_rad
     17 
     18 class SystemLevelChannel(ChannelModel):
     19     # pylint: disable=line-too-long
     20     r"""
     21     Baseclass for implementing 3GPP system level channel models, such as UMi,
     22     UMa, and RMa.
     23 
     24     Parameters
     25     -----------
     26     scenario : SystemLevelScenario
     27         Scenario for the channel simulation
     28 
     29     always_generate_lsp : bool
     30         If `True`, new large scale parameters (LSPs) are generated for every
     31         new generation of channel impulse responses. Otherwise, always reuse
     32         the same LSPs, except if the topology is changed. Defaults to
     33         `False`.
     34 
     35     Input
     36     -----
     37 
     38     num_time_samples : int
     39         Number of time samples
     40 
     41     sampling_frequency : float
     42         Sampling frequency [Hz]
     43 
     44     Output
     45     -------
     46         a : [batch size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths, num_time_samples], tf.complex
     47             Path coefficients
     48 
     49         tau : [batch size, num_rx, num_tx, num_paths], tf.float
     50             Path delays [s]
     51     """
     52 
     53     def __init__(self, scenario, always_generate_lsp=False):
     54 
     55         self._scenario = scenario
     56         self._lsp_sampler = LSPGenerator(scenario)
     57         self._ray_sampler = RaysGenerator(scenario)
     58         self._set_topology_called = False
     59 
     60         if scenario.direction == "uplink":
     61             tx_array = scenario.ut_array
     62             rx_array = scenario.bs_array
     63         else: # "downlink"
     64             tx_array = scenario.bs_array
     65             rx_array = scenario.ut_array
     66         self._cir_sampler = ChannelCoefficientsGenerator(
     67                                             scenario.carrier_frequency,
     68                                             tx_array, rx_array,
     69                                             subclustering=True,
     70                                             dtype = scenario.dtype)
     71 
     72         # Are new LSPs needed
     73         self._always_generate_lsp = always_generate_lsp
     74 
     75     def set_topology(self, ut_loc=None, bs_loc=None, ut_orientations=None,
     76         bs_orientations=None, ut_velocities=None, in_state=None, los=None):
     77         r"""
     78         Set the network topology.
     79 
     80         It is possible to set up a different network topology for each batch
     81         example. The batch size used when setting up the network topology
     82         is used for the link simulations.
     83 
     84         When calling this function, not specifying a parameter leads to the
     85         reuse of the previously given value. Not specifying a value that was not
     86         set at a former call rises an error.
     87 
     88         Input
     89         ------
     90             ut_loc : [batch size,num_ut, 3], tf.float
     91                 Locations of the UTs
     92 
     93             bs_loc : [batch size,num_bs, 3], tf.float
     94                 Locations of BSs
     95 
     96             ut_orientations : [batch size,num_ut, 3], tf.float
     97                 Orientations of the UTs arrays [radian]
     98 
     99             bs_orientations : [batch size,num_bs, 3], tf.float
    100                 Orientations of the BSs arrays [radian]
    101 
    102             ut_velocities : [batch size,num_ut, 3], tf.float
    103                 Velocity vectors of UTs
    104 
    105             in_state : [batch size,num_ut], tf.bool
    106                 Indoor/outdoor state of UTs. `True` means indoor and `False`
    107                 means outdoor.
    108 
    109             los : tf.bool or `None`
    110                 If not `None` (default value), all UTs located outdoor are
    111                 forced to be in LoS if ``los`` is set to `True`, or in NLoS
    112                 if it is set to `False`. If set to `None`, the LoS/NLoS states
    113                 of UTs is set following 3GPP specification [TR38901]_.
    114 
    115         Note
    116         ----
    117         If you want to use this function in Graph mode with XLA, i.e., within
    118         a function that is decorated with ``@tf.function(jit_compile=True)``,
    119         you must set ``sionna.Config.xla_compat=true``.
    120         See :py:attr:`~sionna.Config.xla_compat`.
    121         """
    122 
    123         # Update the scenario topology
    124         need_for_update = self._scenario.set_topology(  ut_loc,
    125                                                         bs_loc,
    126                                                         ut_orientations,
    127                                                         bs_orientations,
    128                                                         ut_velocities,
    129                                                         in_state,
    130                                                         los)
    131 
    132         if need_for_update:
    133             # Update the LSP sampler
    134             self._lsp_sampler.topology_updated_callback()
    135 
    136             # Update the ray sampler
    137             self._ray_sampler.topology_updated_callback()
    138 
    139             # Sample LSPs if no need to generate them everytime
    140             if not self._always_generate_lsp:
    141                 self._lsp = self._lsp_sampler()
    142 
    143         if not self._set_topology_called:
    144             self._set_topology_called = True
    145 
    146     def __call__(self, num_time_samples, sampling_frequency, foo=None):
    147 
    148         # Some channel layers (GenerateOFDMChannel and GenerateTimeChannel)
    149         # give as input (batch_size, num_time_samples, sampling_frequency)
    150         # instead of (num_time_samples, sampling_frequency), as specified
    151         # in the ChannelModel interface.
    152         # With this model, the batch size is ignored, and only the required
    153         # parameters are kept.
    154         if foo is not None:
    155             # batch_size = num_time_samples
    156             num_time_samples = sampling_frequency
    157             sampling_frequency = foo
    158 #             if ( (batch_size is not None)
    159 #                     and tf.not_equal(batch_size,self._scenario.batch_size) ):
    160 #                 tf.print("Warning: The value of `batch_size` specified when \
    161 # calling the channel model is different from the one previously configured for \
    162 # the topology. The value specified when calling is ignored.")
    163 
    164         # Sample LSPs if required
    165         if self._always_generate_lsp:
    166             lsp = self._lsp_sampler()
    167         else:
    168             lsp = self._lsp
    169 
    170         # Sample rays
    171         rays = self._ray_sampler(lsp)
    172 
    173         # Sample channel responses
    174         # First we need to create a topology
    175         # Indicates which end of the channel is moving: TX or RX
    176         if self._scenario.direction == 'downlink':
    177             moving_end = 'rx'
    178             tx_orientations = self._scenario.bs_orientations
    179             rx_orientations = self._scenario.ut_orientations
    180         else : # 'uplink'
    181             moving_end = 'tx'
    182             tx_orientations = self._scenario.ut_orientations
    183             rx_orientations = self._scenario.bs_orientations
    184         topology = Topology(    velocities=self._scenario.ut_velocities,
    185                                 moving_end=moving_end,
    186                                 los_aoa=deg_2_rad(self._scenario.los_aoa),
    187                                 los_aod=deg_2_rad(self._scenario.los_aod),
    188                                 los_zoa=deg_2_rad(self._scenario.los_zoa),
    189                                 los_zod=deg_2_rad(self._scenario.los_zod),
    190                                 los=self._scenario.los,
    191                                 distance_3d=self._scenario.distance_3d,
    192                                 tx_orientations=tx_orientations,
    193                                 rx_orientations=rx_orientations)
    194 
    195         # The channel coefficient needs the cluster delay spread parameter in ns
    196         c_ds = self._scenario.get_param("cDS")*1e-9
    197 
    198         # According to the link direction, we need to specify which from BS
    199         # and UT is uplink, and which is downlink.
    200         # Default is downlink, so we need to do some tranpose to switch tx and
    201         # rx and to switch angle of arrivals and departure if direction is set
    202         # to uplink. Nothing needs to be done if direction is downlink
    203         if self._scenario.direction == "uplink":
    204             aoa = rays.aoa
    205             zoa = rays.zoa
    206             aod = rays.aod
    207             zod = rays.zod
    208             rays.aod = tf.transpose(aoa, [0, 2, 1, 3, 4])
    209             rays.zod = tf.transpose(zoa, [0, 2, 1, 3, 4])
    210             rays.aoa = tf.transpose(aod, [0, 2, 1, 3, 4])
    211             rays.zoa = tf.transpose(zod, [0, 2, 1, 3, 4])
    212             rays.powers = tf.transpose(rays.powers, [0, 2, 1, 3])
    213             rays.delays = tf.transpose(rays.delays, [0, 2, 1, 3])
    214             rays.xpr = tf.transpose(rays.xpr, [0, 2, 1, 3, 4])
    215             los_aod = topology.los_aod
    216             los_aoa = topology.los_aoa
    217             los_zod = topology.los_zod
    218             los_zoa = topology.los_zoa
    219             topology.los_aoa = tf.transpose(los_aod, [0, 2, 1])
    220             topology.los_aod = tf.transpose(los_aoa, [0, 2, 1])
    221             topology.los_zoa = tf.transpose(los_zod, [0, 2, 1])
    222             topology.los_zod = tf.transpose(los_zoa, [0, 2, 1])
    223             topology.los = tf.transpose(topology.los, [0, 2, 1])
    224             c_ds = tf.transpose(c_ds, [0, 2, 1])
    225             topology.distance_3d = tf.transpose(topology.distance_3d, [0, 2, 1])
    226             # Concerning LSPs, only these two are used.
    227             # We do not transpose the others to reduce complexity
    228             k_factor = tf.transpose(lsp.k_factor, [0, 2, 1])
    229             sf = tf.transpose(lsp.sf, [0, 2, 1])
    230         else:
    231             k_factor = lsp.k_factor
    232             sf = lsp.sf
    233 
    234         # pylint: disable=unbalanced-tuple-unpacking
    235         h, delays = self._cir_sampler(num_time_samples, sampling_frequency,
    236                                       k_factor, rays, topology, c_ds)
    237 
    238         # Step 12
    239         h = self._step_12(h, sf)
    240 
    241         # Reshaping to match the expected output
    242         h = tf.transpose(h, [0, 2, 4, 1, 5, 3, 6])
    243         delays = tf.transpose(delays, [0, 2, 1, 3])
    244 
    245         # Stop gadients to avoid useless backpropagation
    246         h = tf.stop_gradient(h)
    247         delays = tf.stop_gradient(delays)
    248 
    249         return h, delays
    250 
    251     def show_topology(self, bs_index=0, batch_index=0):
    252         r"""
    253         Shows the network topology of the batch example with index
    254         ``batch_index``.
    255 
    256         The ``bs_index`` parameter specifies with respect to which BS the
    257         LoS/NLoS state of UTs is indicated.
    258 
    259         Input
    260         -------
    261         bs_index : int
    262             BS index with respect to which the LoS/NLoS state of UTs is
    263             indicated. Defaults to 0.
    264 
    265         batch_index : int
    266             Batch example for which the topology is shown. Defaults to 0.
    267         """
    268 
    269         def draw_coordinate_system(ax, loc, ort, delta):
    270             # This function draw the coordinate system x-y-z, represented by
    271             # three lines with colors red-green-blue (rgb), to show the
    272             # orientation of the array (LCS) in the GCS.
    273             # To always draw a visible and not too big axes, we scale them
    274             # according to the spread of the network in each direction.
    275 
    276             a = ort[0]
    277             b = ort[1]
    278             c = ort[2]
    279 
    280             arrow_ratio_size = 0.1
    281 
    282             x_ = np.array([ np.cos(a)*np.cos(b),
    283                             np.sin(a)*np.cos(b),
    284                             -np.sin(b) ])
    285             scale_x = arrow_ratio_size/np.sqrt(np.sum(np.square(x_/delta)))
    286             x_ = x_*scale_x
    287 
    288             y_ = np.array([ np.cos(a)*np.sin(b)*np.sin(c)-np.sin(a)*np.cos(c),
    289                             np.sin(a)*np.sin(b)*np.sin(c)+np.cos(a)*np.cos(c),
    290                             np.cos(b)*np.sin(c) ])
    291             scale_y = arrow_ratio_size/np.sqrt(np.sum(np.square(y_/delta)))
    292             y_ = y_*scale_y
    293 
    294             z_ = np.array([ np.cos(a)*np.sin(b)*np.cos(c)+np.sin(a)*np.sin(c),
    295                             np.sin(a)*np.sin(b)*np.cos(c)-np.cos(a)*np.sin(c),
    296                             np.cos(b)*np.cos(c)])
    297             scale_z = arrow_ratio_size/np.sqrt(np.sum(np.square(z_/delta)))
    298             z_ = z_*scale_z
    299 
    300             ax.plot([loc[0], loc[0] + x_[0]],
    301                     [loc[1], loc[1] + x_[1]],
    302                     [loc[2], loc[2] + x_[2]], c='r')
    303             ax.plot([loc[0], loc[0] + y_[0]],
    304                     [loc[1], loc[1] + y_[1]],
    305                     [loc[2], loc[2] + y_[2]], c='g')
    306             ax.plot([loc[0], loc[0] + z_[0]],
    307                     [loc[1], loc[1] + z_[1]],
    308                     [loc[2], loc[2] + z_[2]], c='b')
    309 
    310         indoor = self._scenario.indoor.numpy()[batch_index]
    311         los = self._scenario.los.numpy()[batch_index,bs_index]
    312 
    313         indoor_indices = np.where(indoor)
    314         los_indices = np.where(los)
    315         nlos_indices = np.where(np.logical_and(np.logical_not(indoor),
    316                                 np.logical_not(los)))
    317 
    318 
    319         ut_loc = self._scenario.ut_loc.numpy()[batch_index]
    320         bs_loc = self._scenario.bs_loc.numpy()[batch_index]
    321         ut_orientations = self._scenario.ut_orientations.numpy()[batch_index]
    322         bs_orientations = self._scenario.bs_orientations.numpy()[batch_index]
    323 
    324         delta_x = np.max(np.concatenate([ut_loc[:,0], bs_loc[:,0]]))\
    325             - np.min(np.concatenate([ut_loc[:,0], bs_loc[:,0]]))
    326         delta_y = np.max(np.concatenate([ut_loc[:,1], bs_loc[:,1]]))\
    327             - np.min(np.concatenate([ut_loc[:,1], bs_loc[:,1]]))
    328         delta_z = np.max(np.concatenate([ut_loc[:,2], bs_loc[:,2]]))\
    329             - np.min(np.concatenate([ut_loc[:,2], bs_loc[:,2]]))
    330         delta = np.array([delta_x, delta_y, delta_z])
    331 
    332         indoor_ut_loc = ut_loc[indoor_indices]
    333         los_ut_loc = ut_loc[los_indices]
    334         nlos_ut_loc = ut_loc[nlos_indices]
    335 
    336         fig = plt.figure()
    337         ax = fig.add_subplot(projection='3d')
    338         # Showing BS
    339         ax.scatter( bs_loc[:,0], bs_loc[:,1], bs_loc[:,2], c='k', label='BS',
    340                     depthshade=False)
    341         # Showing BS indices and orientations
    342         for u, loc in enumerate(bs_loc):
    343             ax.text(loc[0], loc[1], loc[2], f'{u}')
    344             draw_coordinate_system(ax, loc, bs_orientations[u], delta)
    345         # Showing UTs
    346         ax.scatter(indoor_ut_loc[:,0], indoor_ut_loc[:,1], indoor_ut_loc[:,2],
    347                     c='b', label='UT Indoor', depthshade=False)
    348         ax.scatter(los_ut_loc[:,0], los_ut_loc[:,1], los_ut_loc[:,2],
    349                     c='r', label='UT LoS', depthshade=False)
    350         ax.scatter(nlos_ut_loc[:,0], nlos_ut_loc[:,1], nlos_ut_loc[:,2],
    351                     c='y', label='UT NLoS', depthshade=False)
    352         # Showing UT indices and orientations
    353         for u, loc in enumerate(ut_loc):
    354             ax.text(loc[0], loc[1], loc[2], f'{u}')
    355             draw_coordinate_system(ax, loc, ut_orientations[u], delta)
    356         ax.set_xlabel('x [m]')
    357         ax.set_ylabel('y [m]')
    358         ax.set_zlabel('z [m]')
    359         plt.legend()
    360         plt.tight_layout()
    361 
    362     #####################################################
    363     # Internal utility methods
    364     #####################################################
    365 
    366     def _step_12(self, h, sf):
    367         # pylint: disable=line-too-long
    368         """Apply path loss and shadow fading ``sf`` to paths coefficients ``h``.
    369 
    370         Input
    371         ------
    372         h : [batch size, num_tx, num_rx, num_paths, num_rx_ant, num_tx_ant, num_time_samples], tf.complex
    373             Paths coefficients
    374 
    375         sf : [batch size, num_tx, num_rx]
    376             Shadow fading
    377         """
    378         if self._scenario.pathloss_enabled:
    379             pl_db = self._lsp_sampler.sample_pathloss()
    380             if self._scenario.direction == 'uplink':
    381                 pl_db = tf.transpose(pl_db, [0,2,1])
    382         else:
    383             pl_db = tf.constant(0.0, self._scenario.dtype.real_dtype)
    384 
    385         if not self._scenario.shadow_fading_enabled:
    386             sf = tf.ones_like(sf)
    387 
    388         gain = tf.math.pow(tf.constant(10., self._scenario.dtype.real_dtype),
    389             -(pl_db)/20.)*tf.sqrt(sf)
    390         gain = tf.reshape(gain, tf.concat([tf.shape(gain),
    391             tf.ones([tf.rank(h)-tf.rank(gain)], tf.int32)],0))
    392         h *= tf.complex(gain, tf.constant(0., self._scenario.dtype.real_dtype))
    393 
    394         return h