paths.py (43545B)
1 # 2 # SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 3 # SPDX-License-Identifier: Apache-2.0 4 # 5 """ 6 Dataclass that stores paths 7 """ 8 9 import tensorflow as tf 10 import os 11 import numpy as np 12 13 from . import scene as scene_module 14 from sionna.utils.tensors import expand_to_rank, insert_dims 15 from sionna.constants import PI 16 from .utils import dot, r_hat 17 18 class Paths: 19 # pylint: disable=line-too-long 20 r""" 21 Paths() 22 23 Stores the simulated propagation paths 24 25 Paths are generated for the loaded scene using 26 :meth:`~sionna.rt.Scene.compute_paths`. Please refer to the 27 documentation of this function for further details. 28 These paths can then be used to compute channel impulse responses: 29 30 .. code-block:: Python 31 32 paths = scene.compute_paths() 33 a, tau = paths.cir() 34 35 where ``scene`` is the :class:`~sionna.rt.Scene` loaded using 36 :func:`~sionna.rt.load_scene`. 37 """ 38 39 # Input 40 # ------ 41 42 # mask : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.bool 43 # Set to `False` for non-existent paths. 44 # When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. 45 # For such paths, the channel coefficient is set to `0` and the delay to `-1`. 46 47 # a : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex 48 # Channel coefficients :math:`a_i` as defined in :eq:`T_tilde`. 49 # If there are less than `max_num_path` valid paths between a 50 # transmit and receive antenna, the irrelevant elements are 51 # filled with zeros. 52 53 # tau : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 54 # Propagation delay of each path [s]. 55 # If :attr:`~sionna.rt.Scene.synthetic_array` is `True`, the shape of this tensor 56 # is `[1, num_rx, num_tx, max_num_paths]` as the delays for the 57 # individual antenna elements are assumed to be equal. 58 # If there are less than `max_num_path` valid paths between a 59 # transmit and receive antenna, the irrelevant elements are 60 # filled with -1. 61 62 # theta_t : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 63 # Zenith angles of departure :math:`\theta_{\text{T},i}` [rad]. 64 # If :attr:`~sionna.rt.Scene.synthetic_array` is `True`, the shape of this tensor 65 # is `[1, num_rx, num_tx, max_num_paths]` as the angles for the 66 # individual antenna elements are assumed to be equal. 67 68 # phi_t : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 69 # Azimuth angles of departure :math:`\varphi_{\text{T},i}` [rad]. 70 # See description of ``theta_t``. 71 72 # theta_r : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 73 # Zenith angles of arrival :math:`\theta_{\text{R},i}` [rad]. 74 # See description of ``theta_t``. 75 76 # phi_r : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 77 # Azimuth angles of arrival :math:`\varphi_{\text{T},i}` [rad]. 78 # See description of ``theta_t``. 79 80 # types : [batch_size, max_num_paths], tf.int 81 # Type of path: 82 83 # - 0 : LoS 84 # - 1 : Reflected 85 # - 2 : Diffracted 86 # - 3 : Scattered 87 88 # Types of paths 89 LOS = 0 90 SPECULAR = 1 91 DIFFRACTED = 2 92 SCATTERED = 3 93 RIS = 4 94 95 def __init__(self, 96 sources, 97 targets, 98 scene, 99 types=None): 100 101 dtype = scene.dtype 102 num_sources = sources.shape[0] 103 num_targets = targets.shape[0] 104 rdtype = dtype.real_dtype 105 106 self._a = tf.zeros([num_targets, num_sources, 0], dtype) 107 self._tau = tf.zeros([num_targets, num_sources, 0], rdtype) 108 self._theta_t = tf.zeros([num_targets, num_sources, 0], rdtype) 109 self._theta_r = tf.zeros([num_targets, num_sources, 0], rdtype) 110 self._phi_t = tf.zeros([num_targets, num_sources, 0], rdtype) 111 self._phi_r = tf.zeros([num_targets, num_sources, 0], rdtype) 112 self._mask = tf.fill([num_targets, num_sources, 0], False) 113 self._targets_sources_mask = tf.fill([num_targets, num_sources, 0], False) 114 self._vertices = tf.zeros([0, num_targets, num_sources, 0, 3], rdtype) 115 self._objects = tf.fill([0, num_targets, num_sources, 0], -1) 116 self._doppler = tf.zeros([num_targets, num_sources, 0], rdtype) 117 if types is None: 118 self._types = tf.fill([0], -1) 119 else: 120 self._types = types 121 122 self._sources = sources 123 self._targets = targets 124 self._scene = scene 125 126 # Is the direction reversed? 127 self._reverse_direction = False 128 # Normalize paths delays? 129 self._normalize_delays = False 130 131 def to_dict(self): 132 # pylint: disable=line-too-long 133 r""" 134 Returns the properties of the paths as a dictionary which values are 135 tensors 136 137 Output 138 ------- 139 : `dict` 140 """ 141 members_names = dir(self) 142 members_objects = [getattr(self, attr) for attr in members_names] 143 data = {attr_name[1:] : attr_obj for (attr_obj, attr_name) 144 in zip(members_objects,members_names) 145 if not callable(attr_obj) and 146 not isinstance(attr_obj, scene_module.Scene) and 147 not attr_name.startswith("__") and 148 attr_name.startswith("_")} 149 return data 150 151 def from_dict(self, data_dict): 152 # pylint: disable=line-too-long 153 r""" 154 Set the paths from a dictionary which values are tensors 155 156 The format of the dictionary is expected to be the same as the one 157 returned by :meth:`~sionna.rt.Paths.to_dict()`. 158 159 Input 160 ------ 161 data_dict : `dict` 162 """ 163 for attr_name in data_dict: 164 attr_obj = data_dict[attr_name] 165 setattr(self, '_' + attr_name, attr_obj) 166 167 def export(self, filename): 168 r""" 169 export(filename) 170 171 Saves the paths as an OBJ file for visualisation, e.g., in Blender 172 173 Input 174 ------ 175 filename : str 176 Path and name of the file 177 """ 178 vertices = self.vertices 179 objects = self.objects 180 sources = self.sources 181 targets = self.targets 182 mask = self.targets_sources_mask 183 184 # Content of the obj file 185 r = '' 186 offset = 0 187 for rx in range(vertices.shape[1]): 188 tgt = targets[rx].numpy() 189 for tx in range(vertices.shape[2]): 190 src = sources[tx].numpy() 191 for p in range(vertices.shape[3]): 192 193 # If the path is masked, skip it 194 if not mask[rx,tx,p]: 195 continue 196 197 # Add a comment to describe this path 198 r += f'# Path {p} from tx {tx} to rx {rx}' + os.linesep 199 # Vertices and intersected objects 200 vs = vertices[:,rx,tx,p].numpy() 201 objs = objects[:,rx,tx,p].numpy() 202 203 depth = 0 204 # First vertex is the source 205 r += f"v {src[0]:.8f} {src[1]:.8f} {src[2]:.8f}"+os.linesep 206 # Add intersection points 207 for v,o in zip(vs,objs): 208 # Skip if no intersection 209 if o == -1: 210 continue 211 r += f"v {v[0]:.8f} {v[1]:.8f} {v[2]:.8f}" + os.linesep 212 depth += 1 213 r += f"v {tgt[0]:.8f} {tgt[1]:.8f} {tgt[2]:.8f}"+os.linesep 214 215 # Add the connections 216 for i in range(1, depth+2): 217 v0 = i + offset 218 v1 = i + offset + 1 219 r += f"l {v0} {v1}" + os.linesep 220 221 # Prepare for the next path 222 r += os.linesep 223 offset += depth+2 224 225 # Save the file 226 # pylint: disable=unspecified-encoding 227 with open(filename, 'w') as f: 228 f.write(r) 229 230 @property 231 def mask(self): 232 # pylint: disable=line-too-long 233 """ 234 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.bool : Set to `False` for non-existent paths. 235 When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. 236 For such paths, the channel coefficient is set to `0` and the delay to `-1`. 237 """ 238 return self._mask 239 240 @mask.setter 241 def mask(self, v): 242 self._mask = v 243 244 @property 245 def a(self): 246 # pylint: disable=line-too-long 247 """ 248 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex : Passband channel coefficients :math:`a_i` of each path as defined in :eq:`H_final`. 249 """ 250 return self._a 251 252 @a.setter 253 def a(self, v): 254 self._a = v 255 256 @property 257 def tau(self): 258 # pylint: disable=line-too-long 259 """ 260 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Propagation delay :math:`\\tau_i` [s] of each path as defined in :eq:`H_final`. 261 """ 262 return self._tau 263 264 @tau.setter 265 def tau(self, v): 266 self._tau = v 267 268 @property 269 def theta_t(self): 270 # pylint: disable=line-too-long 271 """ 272 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Zenith angles of departure [rad] 273 """ 274 return self._theta_t 275 276 @theta_t.setter 277 def theta_t(self, v): 278 self._theta_t = v 279 280 @property 281 def phi_t(self): 282 # pylint: disable=line-too-long 283 """ 284 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Azimuth angles of departure [rad] 285 """ 286 return self._phi_t 287 288 @phi_t.setter 289 def phi_t(self, v): 290 self._phi_t = v 291 292 @property 293 def theta_r(self): 294 # pylint: disable=line-too-long 295 """ 296 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Zenith angles of arrival [rad] 297 """ 298 return self._theta_r 299 300 @theta_r.setter 301 def theta_r(self, v): 302 self._theta_r = v 303 304 @property 305 def phi_r(self): 306 # pylint: disable=line-too-long 307 """ 308 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Azimuth angles of arrival [rad] 309 """ 310 return self._phi_r 311 312 @phi_r.setter 313 def phi_r(self, v): 314 self._phi_r = v 315 316 @property 317 def types(self): 318 """ 319 [batch_size, max_num_paths], tf.int : Type of the paths: 320 321 - 0 : LoS 322 - 1 : Reflected 323 - 2 : Diffracted 324 - 3 : Scattered 325 - 4 : RIS 326 """ 327 return self._types 328 329 @types.setter 330 def types(self, v): 331 self._types = v 332 333 @property 334 def sources(self): 335 # pylint: disable=line-too-long 336 """ 337 [num_sources, 3], tf.float : Sources from which rays (paths) are emitted 338 """ 339 return self._sources 340 341 @sources.setter 342 def sources(self, v): 343 self._sources = v 344 345 @property 346 def targets(self): 347 # pylint: disable=line-too-long 348 """ 349 [num_targets, 3], tf.float : Targets at which rays (paths) are received 350 """ 351 return self._targets 352 353 @targets.setter 354 def targets(self, v): 355 self._targets = v 356 357 @property 358 def normalize_delays(self): 359 """ 360 bool : Set to `True` to normalize path delays such that the first path 361 between any pair of antennas of a transmitter and receiver arrives at 362 ``tau = 0``. Defaults to `True`. 363 """ 364 return self._normalize_delays 365 366 @normalize_delays.setter 367 def normalize_delays(self, v): 368 if v == self._normalize_delays: 369 return 370 371 if ~v and self._normalize_delays: 372 self.tau += self._min_tau 373 else: 374 self.tau -= self._min_tau 375 self.tau = tf.where(self.tau<0, tf.cast(-1, self.tau.dtype) , self.tau) 376 self._normalize_delays = v 377 378 @property 379 def doppler(self): 380 # pylint: disable=line-too-long 381 """ 382 [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float : Doppler shift for each path 383 related to movement of objects. The Doppler shifts resulting from 384 movements of the transmitters or receivers will be computed from the inputs to the function :func:`~sionna.rt.Paths.apply_doppler`. 385 """ 386 return self._doppler 387 388 @doppler.setter 389 def doppler(self, v): 390 self._doppler = v 391 392 def apply_doppler(self, sampling_frequency, num_time_steps, 393 tx_velocities=(0.,0.,0.), rx_velocities=(0.,0.,0.)): 394 # pylint: disable=line-too-long 395 r""" 396 Apply Doppler shifts to all paths according to the velocities 397 of objects in the scene as well as the provided transmitter and receiver velocities. 398 399 This function replaces the last dimension of the tensor :attr:`~sionna.rt.Paths.a` storing the 400 time evolution of the paths' coefficients with a dimension of size ``num_time_steps``. 401 402 Time evolution of the channel coefficients is simulated by computing the 403 Doppler shift due to movements of scene objects, transmitters, and receivers. 404 To understand this process, let us consider a single propagation path undergoing 405 :math:`n` scattering processes, such as reflection, diffuse scattering, or diffraction, 406 as shown in the figure below. 407 408 .. figure:: ../figures/doppler.png 409 :align: center 410 411 The object on which lies the :math:`i\text{th}` scattering point has the velocity vector 412 :math:`\hat{\mathbf{v}}_i` and the outgoing ray direction at this point is 413 denoted :math:`\hat{\mathbf{k}}_i`. The first and last point correspond to the transmitter 414 and receiver, respectively. We therefore have 415 416 .. math:: 417 418 \hat{\mathbf{k}}_0 &= \hat{\mathbf{r}}(\theta_{\text{T}}, \varphi_{\text{T}})\\ 419 \hat{\mathbf{k}}_{n} &= -\hat{\mathbf{r}}(\theta_{\text{R}}, \varphi_{\text{R}}) 420 421 where :math:`(\theta_{\text{T}}, \varphi_{\text{T}})` are the AoDs, 422 :math:`(\theta_{\text{R}}, \varphi_{\text{R}})` are the AoAs, and :math:`\hat{\mathbf{r}}(\theta,\varphi)` is defined in :eq:`spherical_vecs`. 423 424 If the transmitter emits a signal with frequency :math:`f`, the receiver 425 will observe the signal at frequency :math:`f'=f + f_\Delta`, where :math:`f_\Delta` is the Doppler 426 shift, which can be computed as [Wiffen2018]_ 427 428 .. math:: 429 430 f' = f \prod_{i=0}^n \frac{1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}{1 - \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}}. 431 432 Under the assumption that :math:`\lVert \mathbf{v}_i \rVert\ll c`, we can apply the Taylor expansion :math:`(1-x)^{-1}\approx 1+x`, for :math:`x\ll 1`, to the previous equation 433 to obtain 434 435 .. math:: 436 437 f' &\approx f \prod_{i=0}^n \left(1 - \frac{\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\left(1 + \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i}{c}\right)\\ 438 &\approx f \left(1 + \sum_{i=0}^n \frac{\mathbf{v}_{i}^\mathsf{T}\hat{\mathbf{k}}_i -\mathbf{v}_{i+1}^\mathsf{T}\hat{\mathbf{k}}_i}{c} \right) 439 440 where the second line results from ignoring terms in :math:`c^{-2}`. Solving for :math:`f_\Delta`, grouping terms with the same :math:`\mathbf{v}_i` together, and using :math:`f=c/\lambda`, we obtain 441 442 .. math:: 443 444 f_\Delta = \frac{1}{\lambda}\left(\mathbf{v}_{0}^\mathsf{T}\hat{\mathbf{k}}_0 - \mathbf{v}_{n+1}^\mathsf{T}\hat{\mathbf{k}}_n + \sum_{i=1}^n \mathbf{v}_{i}^\mathsf{T}\left(\hat{\mathbf{k}}_i-\hat{\mathbf{k}}_{i-1} \right) \right) \qquad \text{[Hz]}. 445 446 Using this Doppler shift, the time-dependent path coefficient is computed as 447 448 .. math :: 449 450 a(t) = a e^{j2\pi f_\Delta t}. 451 452 Note that this model is only valid as long as the AoDs, AoAs, and path delays do not change significantly. 453 This is typically the case for very short time intervals. Large-scale mobility should be simulated by moving objects within the scene and recomputing the propagation paths. 454 455 When this function is called multiple times, it overwrites the previous time step dimension. 456 457 Input 458 ------ 459 sampling_frequency : float 460 Frequency [Hz] at which the channel impulse response is sampled 461 462 num_time_steps : int 463 Number of time steps. 464 465 tx_velocities : [batch_size, num_tx, 3] or broadcastable, tf.float | `None` 466 Velocity vectors :math:`(v_\text{x}, v_\text{y}, v_\text{z})` of all 467 transmitters [m/s]. 468 Defaults to `[0,0,0]`. 469 470 rx_velocities : [batch_size, num_rx, 3] or broadcastable, tf.float | `None` 471 Velocity vectors :math:`(v_\text{x}, v_\text{y}, v_\text{z})` of all 472 receivers [m/s]. 473 Defaults to `[0,0,0]`. 474 """ 475 476 dtype = self._scene.dtype 477 rdtype = dtype.real_dtype 478 zeror = tf.zeros((), rdtype) 479 two_pi = tf.cast(2.*PI, rdtype) 480 481 tx_velocities = tf.cast(tx_velocities, rdtype) 482 tx_velocities = expand_to_rank(tx_velocities, 3, 0) 483 if tx_velocities.shape[2] != 3: 484 raise ValueError("Last dimension of `tx_velocities` must equal 3") 485 486 if rx_velocities is None: 487 rx_velocities = [0.,0.,0.] 488 rx_velocities = tf.cast(rx_velocities, rdtype) 489 rx_velocities = expand_to_rank(rx_velocities, 3, 0) 490 if rx_velocities.shape[2] != 3: 491 raise ValueError("Last dimension of `rx_velocities` must equal 3") 492 493 sampling_frequency = tf.cast(sampling_frequency, rdtype) 494 if sampling_frequency <= 0.0: 495 raise ValueError("The sampling frequency must be positive") 496 497 num_time_steps = tf.cast(num_time_steps, tf.int32) 498 if num_time_steps <= 0: 499 msg = "The number of time samples must a positive integer" 500 raise ValueError(msg) 501 502 # Drop previous time step dimension, if any 503 if tf.rank(self.a) == 7: 504 self.a = self.a[...,0] 505 506 # [batch_size, num_rx, num_tx, max_num_paths, 3] 507 # or 508 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 3] 509 k_t = r_hat(self.theta_t, self.phi_t) 510 k_r = r_hat(self.theta_r, self.phi_r) 511 512 if self._scene.synthetic_array: 513 # [batch_size, num_rx, 1, num_tx, 1, max_num_paths, 3] 514 k_t = tf.expand_dims(tf.expand_dims(k_t, axis=2), axis=4) 515 # [batch_size, num_rx, 1, num_tx, 1, max_num_paths, 3] 516 k_r = tf.expand_dims(tf.expand_dims(k_r, axis=2), axis=4) 517 518 # Expand rank of the speed vector for broadcasting with k_r 519 # [batch_dim, 1, 1, num_tx, 1, 1, 3] 520 tx_velocities = insert_dims(insert_dims(tx_velocities, 2,1), 2,4) 521 # [batch_dim, num_rx, 1, 1, 1, 1, 3] 522 rx_velocities = insert_dims(rx_velocities, 4, 2) 523 524 # Generate time steps 525 # [num_time_steps] 526 ts = tf.range(num_time_steps, dtype=rdtype) 527 ts = ts / sampling_frequency 528 529 # Compute the Doppler shift 530 # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 531 # or 532 # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths] 533 tx_ds = two_pi*dot(tx_velocities, k_t)/self._scene.wavelength 534 rx_ds = two_pi*dot(rx_velocities, k_r)/self._scene.wavelength 535 ds = tx_ds + rx_ds 536 537 # Add Doppler shifts due to movement of scene objects 538 if self._scene.synthetic_array: 539 # Create dummy dims for tx and rx antennas 540 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 541 ds += two_pi*self.doppler[..., tf.newaxis, :, tf.newaxis, :] 542 else: 543 ds += two_pi*self.doppler 544 545 # Expand for the time sample dimension 546 # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] 547 # or 548 # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths, 1] 549 ds = tf.expand_dims(ds, axis=-1) 550 # Expand time steps for broadcasting 551 # [1, 1, 1, 1, 1, 1, num_time_steps] 552 ts = expand_to_rank(ts, tf.rank(ds), 0) 553 # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 554 # num_time_steps] 555 # or 556 # [batch_dim, num_rx, 1, num_tx, 1, max_num_paths, num_time_steps] 557 ds = ds*ts 558 exp_ds = tf.exp(tf.complex(zeror, ds)) 559 560 # Apply Doppler shift 561 # Expand with time dimension 562 # [batch_dim, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] 563 a = tf.expand_dims(self.a, axis=-1) 564 565 # Manual broadcast last dimension 566 a = tf.repeat(a, exp_ds.shape[6], -1) 567 568 a = a*exp_ds 569 570 self.a = a 571 572 @property 573 def reverse_direction(self): 574 r""" 575 bool : If set to `True`, swaps receivers and transmitters 576 """ 577 return self._reverse_direction 578 579 @reverse_direction.setter 580 def reverse_direction(self, v): 581 582 if v == self._reverse_direction: 583 return 584 585 if tf.rank(self.a) == 6: 586 self.a = tf.transpose(self.a, perm=[0,3,4,1,2,5]) 587 else: 588 self.a = tf.transpose(self.a, perm=[0,3,4,1,2,5,6]) 589 590 if self._scene.synthetic_array: 591 self.tau = tf.transpose(self.tau, perm=[0,2,1,3]) 592 self._min_tau = tf.transpose(self._min_tau, perm=[0,2,1,3]) 593 self.theta_t = tf.transpose(self.theta_t, perm=[0,2,1,3]) 594 self.phi_t = tf.transpose(self.phi_t, perm=[0,2,1,3]) 595 self.theta_r = tf.transpose(self.theta_r, perm=[0,2,1,3]) 596 self.phi_r = tf.transpose(self.phi_r, perm=[0,2,1,3]) 597 self.doppler = tf.transpose(self.doppler, perm=[0,2,1,3]) 598 else: 599 self.tau = tf.transpose(self.tau, perm=[0,3,4,1,2,5]) 600 self._min_tau = tf.transpose(self._min_tau, perm=[0,3,4,1,2,5]) 601 self.theta_t = tf.transpose(self.theta_t, perm=[0,3,4,1,2,5]) 602 self.phi_t = tf.transpose(self.phi_t, perm=[0,3,4,1,2,5]) 603 self.theta_r = tf.transpose(self.theta_r, perm=[0,3,4,1,2,5]) 604 self.phi_r = tf.transpose(self.phi_r, perm=[0,3,4,1,2,5]) 605 self.doppler = tf.transpose(self.doppler, perm=[0,3,4,1,2,5]) 606 607 self._reverse_direction = v 608 609 def cir(self, 610 los=True, 611 reflection=True, 612 diffraction=True, 613 scattering=True, 614 ris=True, 615 cluster_ris_paths=True, 616 num_paths=None): 617 # pylint: disable=line-too-long 618 r""" 619 Returns the baseband equivalent channel impulse response :eq:`h_b` 620 which can be used for link simulations by other Sionna components. 621 622 The baseband equivalent channel coefficients :math:`a^{\text{b}}_{i}` 623 are computed as : 624 625 .. math:: 626 a^{\text{b}}_{i} = a_{i} e^{-j2 \pi f \tau_{i}} 627 628 where :math:`i` is the index of an arbitrary path, :math:`a_{i}` 629 is the passband path coefficient (:attr:`~sionna.rt.Paths.a`), 630 :math:`\tau_{i}` is the path delay (:attr:`~sionna.rt.Paths.tau`), 631 and :math:`f` is the carrier frequency. 632 633 Note: For the paths of a given type to be returned (LoS, reflection, etc.), they 634 must have been previously computed by :meth:`~sionna.rt.Scene.compute_paths`, i.e., 635 the corresponding flags must have been set to `True`. 636 637 Input 638 ------ 639 los : bool 640 If set to `False`, LoS paths are not returned. 641 Defaults to `True`. 642 643 reflection : bool 644 If set to `False`, specular paths are not returned. 645 Defaults to `True`. 646 647 diffraction : bool 648 If set to `False`, diffracted paths are not returned. 649 Defaults to `True`. 650 651 scattering : bool 652 If set to `False`, scattered paths are not returned. 653 Defaults to `True`. 654 655 ris : bool 656 If set to `False`, paths involving RIS are not returned. 657 Defaults to `True`. 658 659 cluster_ris_paths : bool 660 If set to `True`, the paths from each RIS are coherently combined 661 into a single path, and the delays are averaged. 662 Note that this process is performed separately for each RIS. 663 For large RIS, clustering the paths significantly reduces the memory 664 required to run link-level simulations. 665 Defaults to `True`. 666 667 num_paths : int or `None` 668 All CIRs are either zero-padded or cropped to the largest 669 ``num_paths`` paths. 670 Defaults to `None` which means that no padding or cropping is done. 671 672 Output 673 ------- 674 a : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, num_time_steps], tf.complex 675 Path coefficients 676 677 tau : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] or [batch_size, num_rx, num_tx, max_num_paths], tf.float 678 Path delays 679 """ 680 681 ris = ris and (len(self._scene.ris) > 0) 682 683 # Select only the desired effects 684 types = self.types[0] 685 # [max_num_paths] 686 selection_mask = tf.fill(tf.shape(types), False) 687 if los: 688 selection_mask = tf.logical_or(selection_mask, 689 types == Paths.LOS) 690 if reflection: 691 selection_mask = tf.logical_or(selection_mask, 692 types == Paths.SPECULAR) 693 if diffraction: 694 selection_mask = tf.logical_or(selection_mask, 695 types == Paths.DIFFRACTED) 696 if scattering: 697 selection_mask = tf.logical_or(selection_mask, 698 types == Paths.SCATTERED) 699 if ris: 700 if cluster_ris_paths: 701 # Combine path coefficients from every RIS coherently and 702 # average their delays. 703 # This process is performed separately for each RIS. 704 # 705 # Extract paths coefficients and delays corresponding to RIS 706 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 707 # num_ris_paths, num_time_steps] 708 a_ris = tf.gather(self.a, tf.where(types == Paths.RIS)[:,0], 709 axis=-2) 710 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 711 # num_ris_paths] or [batch_size, num_rx,num_tx,num_ris_paths] 712 tau_ris = tf.gather(self.tau, tf.where(types == Paths.RIS)[:,0], 713 axis=-1) 714 # Loop over RIS to combine their path coefficients and delays 715 index_start = 0 716 index_end = 0 717 a_combined_ris_all = [] 718 tau_combined_ris_all = [] 719 for ris_ in self._scene.ris.values(): 720 index_end = index_start + ris_.num_cells 721 # Extract the path coefficients and delays corresponding to 722 # the paths from RIS 723 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 724 # num_this_ris_path, num_time_steps] 725 a_this_ris = a_ris[..., index_start:index_end,:] 726 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 727 # num_ris_paths] or [batch_size, num_rx,num_tx, 728 # num_ris_paths] 729 tau_this_ris = tau_ris[...,index_start:index_end] 730 # Average the delays 731 # It can happen that some paths are invalid with -1 delay 732 # We hence compute a mask for valid paths 733 valid_this_ris = tau_this_ris!=-1 734 # Set delay of invalid paths to zero 735 tau_this_ris = tf.where(valid_this_ris, tau_this_ris, 0) 736 # Compute number of valid paths 737 valid_this_ris = tf.cast(valid_this_ris, tf.float32) 738 num_valid_paths_this_ris = tf.reduce_sum(valid_this_ris, 739 axis=-1, 740 keepdims=True) 741 # Compute average delay over all valid paths 742 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 743 # 1] or [batch_size, num_rx,num_tx, 1] 744 mean_tau_this_ris = tf.reduce_sum(tau_this_ris, axis=-1, 745 keepdims=True) 746 mean_tau_this_ris /= num_valid_paths_this_ris 747 # Phase shift due to propagation delay. 748 # We subtract the average delay to ensure the propagation 749 # delay is not applied, only the phase shift due to the 750 # RIS geometry 751 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 752 # num_this_ris_path] or [batch_size, num_rx,num_tx, 753 # num_ris_paths] 754 tau_this_ris -= mean_tau_this_ris 755 ps = tf.complex(tf.zeros_like(tau_this_ris), 756 -2.*PI*self._scene.frequency*tau_this_ris) 757 # [batch_size, num_rx, num_rx_ant/1, num_tx, num_tx_ant/1, 758 # num_this_ris_path, 1] 759 ps = ps[...,tf.newaxis] 760 if self._scene.synthetic_array: 761 ps = tf.expand_dims(ps, 2) 762 ps = tf.expand_dims(ps, 4) 763 ps = tf.repeat(ps, a_this_ris.shape[2], axis=2) 764 ps = tf.repeat(ps, a_this_ris.shape[4], axis=4) 765 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 766 # num_this_ris_path, num_time_steps] 767 a_this_ris = a_this_ris*tf.exp(ps) 768 # Combine the paths coefficients and delays 769 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, 1, 770 # num_time_steps] 771 a_this_ris = tf.reduce_sum(a_this_ris, axis=-2, 772 keepdims=True) 773 a_combined_ris_all.append(a_this_ris) 774 tau_combined_ris_all.append(mean_tau_this_ris) 775 776 index_start = index_end 777 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_ris, 778 # num_time_steps] 779 a_combined_ris_all = tf.concat(a_combined_ris_all, axis=-2) 780 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_ris] 781 # or [batch_size, num_rx, num_tx, num_ris] 782 tau_combined_ris_all = tf.concat(tau_combined_ris_all, axis=-1) 783 784 else: 785 selection_mask = tf.logical_or(selection_mask, 786 types == Paths.RIS) 787 788 # Extract selected paths 789 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 790 # num_time_steps] 791 a = tf.gather(self.a, tf.where(selection_mask)[:,0], axis=-2) 792 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 793 # or [batch_size, num_rx, num_tx, max_num_paths] 794 tau = tf.gather(self.tau, tf.where(selection_mask)[:,0], axis=-1) 795 796 # If RIS paths were combined, add the results of the clustering 797 if ris and cluster_ris_paths: 798 a = tf.concat([a, a_combined_ris_all], axis=-2) 799 tau = tf.concat([tau, tau_combined_ris_all], axis=-1) 800 801 # Compute base-band CIR 802 # We now need to compute time evolution of a for num_time_steps 803 # This requires a dummy tau tensor. 804 if self._scene.synthetic_array: 805 tau_ = tf.expand_dims(tau, 2) 806 tau_ = tf.expand_dims(tau_, 4) 807 else: 808 tau_ = tau 809 810 # [batch_size, num_rx, 1/num_rx_ant, num_tx, 1/num_tx_ant, 811 # max_num_paths, num_time_steps, 1] 812 tau_ = tf.expand_dims(tau_, -1) 813 phase = tf.complex(tf.zeros_like(tau_), 814 -2*PI*self._scene.frequency*tau_) 815 # Manual repeat along the time step dimension as high-dimensional 816 # broadcast is not possible 817 phase = tf.repeat(phase, a.shape[-1], axis=-1) 818 a = a*tf.exp(phase) 819 820 if num_paths is not None: 821 a, tau = self.pad_or_crop(a, tau, num_paths) 822 823 return a,tau 824 825 ####################################################### 826 # Internal methods and properties 827 ####################################################### 828 829 @ property 830 def targets_sources_mask(self): 831 # pylint: disable=line-too-long 832 """ 833 [num_targets, num_sources, max_num_paths], tf.bool : Set to `False` for non-existent paths. 834 When there are multiple transmitters or receivers, path counts may vary between links. This is used to identify non-existent paths. 835 For such paths, the channel coefficient is set to `0` and the delay to `-1`. 836 Same as `mask`, but for sources and targets. 837 """ 838 return self._targets_sources_mask 839 840 @ targets_sources_mask.setter 841 def targets_sources_mask(self, v): 842 self._targets_sources_mask = v 843 844 @property 845 def vertices(self): 846 # pylint: disable=line-too-long 847 """ 848 [max_depth, num_targets, num_sources, max_num_paths, 3], tf.float : Positions of intersection points. 849 """ 850 return self._vertices 851 852 @vertices.setter 853 def vertices(self, v): 854 self._vertices = v 855 856 @property 857 def objects(self): 858 # pylint: disable=line-too-long 859 """ 860 [max_depth, num_targets, num_sources, max_num_paths], tf.int : Indices of the intersected scene objects 861 or wedges. Paths with depth lower than ``max_depth`` are padded with `-1`. 862 """ 863 return self._objects 864 865 @objects.setter 866 def objects(self, v): 867 self._objects = v 868 869 def merge(self, more_paths): 870 r""" 871 Merge ``more_paths`` with the current paths and returns the so-obtained 872 instance. `self` is not updated. 873 874 Input 875 ----- 876 more_paths : :class:`~sionna.rt.Paths` 877 First set of paths to merge 878 """ 879 880 dtype = self._scene.dtype 881 882 more_vertices = more_paths.vertices 883 more_objects = more_paths.objects 884 more_types = more_paths.types 885 886 # The paths to merge must have the same number of sources and targets 887 assert more_paths.targets.shape[0] == self.targets.shape[0],\ 888 "Paths to merge must have same number of targets" 889 assert more_paths.sources.shape[0] == self.sources.shape[0],\ 890 "Paths to merge must have same number of targets" 891 892 # Pad the paths with the lowest depth 893 padding = self.vertices.shape[0] - more_vertices.shape[0] 894 if padding > 0: 895 more_vertices = tf.pad(more_vertices, 896 [[0,padding],[0,0],[0,0],[0,0],[0,0]], 897 constant_values=tf.zeros((), 898 dtype.real_dtype)) 899 more_objects = tf.pad(more_objects, 900 [[0,padding],[0,0],[0,0],[0,0]], 901 constant_values=-1) 902 elif padding < 0: 903 padding = -padding 904 self.vertices = tf.pad(self.vertices, 905 [[0,padding],[0,0],[0,0],[0,0],[0,0]], 906 constant_values=tf.zeros((), dtype.real_dtype)) 907 self.objects = tf.pad(self.objects, 908 [[0,padding],[0,0],[0,0],[0,0]], 909 constant_values=-1) 910 911 # Merge types 912 if tf.rank(self.types) == 0: 913 merged_types = tf.repeat(self.types, tf.shape(self.vertices)[3]) 914 else: 915 merged_types = self.types 916 if tf.rank(more_types) == 0: 917 more_types = tf.repeat(more_types, tf.shape(more_vertices)[3]) 918 919 self.types = tf.concat([merged_types, more_types], axis=0) 920 921 # Concatenate all 922 self.a = tf.concat([self.a, more_paths.a], axis=2) 923 self.tau = tf.concat([self.tau, more_paths.tau], axis=2) 924 self.theta_t = tf.concat([self.theta_t, more_paths.theta_t], axis=2) 925 self.phi_t = tf.concat([self.phi_t, more_paths.phi_t], axis=2) 926 self.theta_r = tf.concat([self.theta_r, more_paths.theta_r], axis=2) 927 self.phi_r = tf.concat([self.phi_r, more_paths.phi_r], axis=2) 928 self.mask = tf.concat([self.mask, more_paths.mask], axis=2) 929 self.vertices = tf.concat([self.vertices, more_vertices], axis=3) 930 self.objects = tf.concat([self.objects, more_objects], axis=3) 931 self.doppler = tf.concat([self.doppler, more_paths.doppler], axis=2) 932 933 return self 934 935 def finalize(self): 936 """ 937 This function must be called to finalize the creation of the paths. 938 This function: 939 940 - Flags the LoS paths 941 942 - Computes the smallest delay for delay normalization 943 """ 944 945 self.set_los_path_type() 946 947 # Add dummy-dimension for batch_size 948 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 949 self.mask = tf.expand_dims(self.mask, axis=0) 950 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 951 self.a = tf.expand_dims(self.a, axis=0) 952 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 953 self.tau = tf.expand_dims(self.tau, axis=0) 954 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 955 self.theta_t = tf.expand_dims(self.theta_t, axis=0) 956 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 957 self.phi_t = tf.expand_dims(self.phi_t, axis=0) 958 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 959 self.theta_r = tf.expand_dims(self.theta_r, axis=0) 960 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 961 self.phi_r = tf.expand_dims(self.phi_r, axis=0) 962 # [1, max_num_paths] 963 self.types = tf.expand_dims(self.types, axis=0) 964 # [1, max_num_paths] 965 self.doppler = tf.expand_dims(self.doppler, axis=0) 966 967 tau = self.tau 968 if tau.shape[-1] == 0: # No paths 969 self._min_tau = tf.zeros_like(tau) 970 else: 971 zero = tf.zeros((), tau.dtype) 972 inf = tf.cast(np.inf, tau.dtype) 973 tau = tf.where(tau < zero, inf, tau) 974 if self._scene.synthetic_array: 975 # [1, num_rx, num_tx, 1] 976 min_tau = tf.reduce_min(tau, axis=3, keepdims=True) 977 else: 978 # [1, num_rx, 1, num_tx, 1, 1] 979 min_tau = tf.reduce_min(tau, axis=(2, 4, 5), keepdims=True) 980 min_tau = tf.where(tf.math.is_inf(min_tau), zero, min_tau) 981 self._min_tau = min_tau 982 983 # Add the time steps dimension 984 # [1, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths, 1] 985 self.a = tf.expand_dims(self.a, axis=-1) 986 987 # Normalize delays 988 self.normalize_delays = True 989 990 def set_los_path_type(self): 991 """ 992 Flags paths that do not hit any object as LoS 993 """ 994 995 # [max_depth, num_targets, num_sources, num_paths] 996 objects = self.objects 997 # [num_targets, num_sources, num_paths] 998 mask = self.targets_sources_mask 999 1000 if objects.shape[3] > 0: 1001 # [num_targets, num_sources, num_paths] 1002 los_path = tf.reduce_all(objects == -1, axis=0) 1003 # [num_targets, num_sources, num_paths] 1004 los_path = tf.logical_and(los_path, mask) 1005 # [num_paths] 1006 los_path = tf.reduce_any(los_path, axis=(0,1)) 1007 # [[1]] 1008 los_path_index = tf.where(los_path) 1009 updates = tf.repeat(Paths.LOS, tf.shape(los_path_index)[0], 0) 1010 self.types = tf.tensor_scatter_nd_update(self.types, 1011 los_path_index, 1012 updates) 1013 1014 def pad_or_crop(self, a, tau, k): 1015 """ 1016 Enforces that CIRs have exactly k paths by either 1017 zero-padding of cropping the weakest paths 1018 """ 1019 max_num_paths = a.shape[-2] 1020 1021 # Crop 1022 if k<max_num_paths: 1023 # Compute indices of the k strongest paths 1024 # As is independent of the number of time steps, 1025 # Therefore, we use only the first one a[...,0]. 1026 # ind : [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k] 1027 _, ind = tf.math.top_k(tf.abs(a[...,0]), k=k, sorted=True) 1028 1029 # Gather the strongest paths 1030 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k, num_time_steps] 1031 a = tf.gather(a, ind, batch_dims=5) 1032 1033 # Gather the corresponding path delays 1034 # Synthetic array 1035 if tf.rank(tau)==4: 1036 # tau : [batch_size, num_rx, num_tx, max_num_paths] 1037 1038 # Get relevant indices 1039 # [batch_size, num_rx, num_tx, k] 1040 ind_tau = ind[:,:,0,:,0] 1041 1042 # [batch_size, num_rx, num_tx, k] 1043 tau = tf.gather(tau, ind_tau, batch_dims=3) 1044 1045 # Non-synthetic array 1046 else: 1047 # tau: [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, max_num_paths] 1048 1049 # [batch_size, num_rx, num_rx_ant, num_tx, num_tx_ant, k] 1050 tau = tf.gather(tau, ind, batch_dims=5) 1051 1052 # Pad 1053 elif k>max_num_paths: 1054 # Pad paths with zeros 1055 pad_size = k-max_num_paths 1056 1057 # Paddings for the paths gains 1058 paddings = tf.constant([[0, 0] if i != 5 else [0, pad_size] for i in range(7)]) 1059 a = tf.pad(a, paddings=paddings, mode='CONSTANT', constant_values=0) 1060 1061 # Paddings for the delays (-1 by Sionna convention) 1062 paddings = tf.constant([[0, 0] if i != tf.rank(tau)-1 else [0, pad_size] for i in range(tf.rank(tau))]) 1063 tau = tf.pad(tau, paddings=paddings, mode='CONSTANT', constant_values=-1) 1064 1065 return a, tau