anomaly-detection-material-parameters-calibration

Sionna param calibration (research proj)
git clone https://git.ea.contact/anomaly-detection-material-parameters-calibration
Log | Files | Refs | README

layer_mapping.py (10479B)


      1 #
      2 # SPDX-FileCopyrightText: Copyright (c) 2021-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
      3 # SPDX-License-Identifier: Apache-2.0
      4 #
      5 """Layer mapping for the 5G NR sub-package of the Sionna library.
      6 """
      7 
      8 import tensorflow as tf
      9 from tensorflow.keras.layers import Layer
     10 from sionna.utils import flatten_last_dims, split_dim
     11 
     12 class LayerMapper(Layer):
     13     # pylint: disable=line-too-long
     14     r"""LayerMapper(num_layers=1, verbose=False, **kwargs)
     15     Performs MIMO layer mapping of modulated symbols to layers as defined in
     16     [3GPP38211]_.
     17 
     18     The LayerMapper supports PUSCH and PDSCH channels and follows the procedure
     19     as defined in Sec. 6.3.1.3 and Sec. 7.3.1.3 in [3GPP38211]_, respectively.
     20 
     21     As specified in Tab. 7.3.1.3.-1 [3GPP38211]_, the LayerMapper expects two
     22     input streams for multiplexing if more than 4 layers are active (only
     23     relevant for PDSCH).
     24 
     25     The class inherits from the Keras layer class and can be used as layer in a
     26     Keras model.
     27 
     28     Parameters
     29     ----------
     30         num_layers: int, 1 (default) | [1,...,8]
     31             Number of MIMO layers. If
     32             ``num_layers`` >=4, a list of two inputs is expected.
     33 
     34         verbose: bool, False (default)
     35             If True, additional parameters are printed.
     36 
     37     Input
     38     -----
     39         inputs: [...,n], or [[...,n1], [...,n2]], tf.complex
     40             2+D tensor containing the sequence of symbols to be mapped. If
     41             ``num_layers`` >=4, a list of two inputs is expected and `n1`/`n2`
     42             must be chosen as defined in Tab. 7.3.1.3.-1 [3GPP38211]_.
     43 
     44     Output
     45     ------
     46         : [...,num_layers, n/num_layers], tf.complex
     47             2+D tensor containing the sequence of symbols mapped to the MIMO
     48             layers.
     49     """
     50 
     51     def __init__(self,
     52                  num_layers=1,
     53                  verbose=False,
     54                  **kwargs):
     55 
     56         super().__init__(**kwargs)
     57 
     58         assert isinstance(verbose, bool), "verbose must be bool"
     59         self._verbose = verbose
     60 
     61         assert num_layers in range(1,9), \
     62                             'num_layers must be between 1 and 8.'
     63         self._num_layers = num_layers
     64 
     65         # follow Tab. 7.3.1.3-1 from 38.211 for CW multiplexing
     66         if self._num_layers<5:
     67             self._num_codewords=1
     68         elif self._num_layers==5:
     69             self._num_codewords=2
     70             self._num_layers0 = 2
     71             self._num_layers1 = 3
     72         elif self._num_layers==6:
     73             self._num_codewords=2
     74             self._num_layers0 = 3
     75             self._num_layers1 = 3
     76         elif self._num_layers==7:
     77             self._num_codewords=2
     78             self._num_layers0 = 3
     79             self._num_layers1 = 4
     80         elif self._num_layers==8:
     81             self._num_codewords=2
     82             self._num_layers0 = 4
     83             self._num_layers1 = 4
     84         else:
     85             raise ValueError("Invalid number of layers.")
     86 
     87         if self._verbose: # provide information about layer configuration
     88             print("Number of layers: ", self._num_layers)
     89             if self._num_codewords==2:
     90                 print("Dual codeword mode active and cw multiplexing as " \
     91                       "defined in Tab. 7.3.1.3-1 from 38.211 applied.")
     92                 print(f"Length of cw1/cw2: {self._num_layers0}/"\
     93                       f"{self._num_layers1} ")
     94 
     95     #########################################
     96     # Public methods and properties
     97     #########################################
     98 
     99     @property
    100     def num_codewords(self):
    101         """Number of input codewords for layer mapping. Can be either 1 or 2."""
    102         return self._num_codewords
    103 
    104     @property
    105     def num_layers(self):
    106         """ Number of MIMO layers"""
    107         return self._num_layers
    108 
    109     @property
    110     def num_layers0(self):
    111         r"""Number of layers for first codeword (only relevant for
    112         `num_codewords` =2)"""
    113         if self._num_codewords==1:
    114             return self._num_layers
    115         return self._num_layers0
    116 
    117     @property
    118     def num_layers1(self):
    119         r"""Number of layers for second codeword (only relevant for
    120         `num_codewords` =2)"""
    121         if self._num_codewords==1:
    122             return 0 # no second stream
    123         return self._num_layers1
    124 
    125     def build(self, input_shapes):
    126         """Test input shapes for consistency."""
    127 
    128         if self._num_codewords==1: # single cw mode
    129             assert not isinstance(input_shapes[0], tf.TensorShape),\
    130                             "Only single input codeword expected."
    131             assert input_shapes[-1]%self._num_layers==0,\
    132                     "Invalid input dimensions: last dimension must be a " \
    133                     "multiple of num_layers."
    134         else: # dual cw mode
    135             # inputs must be a list of two streams
    136             s0 = input_shapes[0].as_list()
    137             s1 = input_shapes[1].as_list()
    138             assert isinstance(s0, list), \
    139                             "List of two inputs streams is expected."
    140             assert isinstance(s1, list), \
    141                             "List of two inputs streams is expected."
    142 
    143             assert s0[-1]%self._num_layers0==0,\
    144                     "Invalid input dimensions: last dimension of first input "\
    145                     "must be a multiple of num_layers0."
    146             assert s1[-1]%self._num_layers1==0,\
    147                     "Invalid input dimensions: last dimension of second input "\
    148                     "must be a multiple of num_layers1."
    149 
    150             # verify that length of tb1 and tb2 fit together
    151             assert s0[-1]/self._num_layers0 == s1[-1]/self._num_layers1, \
    152                     f"Invalid input dimensions: length of first input must be "\
    153                     f"{self._num_layers0/self._num_layers1:.2f} of the length "\
    154                     f"of the second input."
    155 
    156     def call(self, inputs):
    157         """Applies MIMO Layer mapping as defined in Sec. 6.3.1.3 and Sec.
    158         7.3.1.3 38.211."""
    159 
    160         if self._num_codewords==1:
    161             s = inputs.shape[-1]
    162             y = split_dim(inputs,(int(s/self._num_layers), self._num_layers),
    163                           axis=len(inputs.shape)-1)
    164         else:
    165             # for PDSCH only: support dual stream multiplexing
    166             x0 = inputs[0]
    167             x1 = inputs[1]
    168             s0 = x0.shape[-1]
    169             s1 = x1.shape[-1]
    170 
    171             y0 = split_dim(x0,(int(s0/self._num_layers0), self._num_layers0),
    172                            axis=len(x0.shape)-1)
    173             y1 = split_dim(x1,(int(s1/self._num_layers1), self._num_layers1),
    174                            axis=len(x1.shape)-1)
    175 
    176             y = tf.concat([y0, y1], axis=-1)
    177 
    178         # swap last two dimensions
    179         y = tf.experimental.numpy.swapaxes(y, axis1=-1, axis2=-2)
    180         return y
    181 
    182 class LayerDemapper(Layer):
    183     # pylint: disable=line-too-long
    184     r"""LayerDemapper(layer_mapper, num_bits_per_symbol=1, **kwargs)
    185     Demaps MIMO layers to coded transport block(s) by following Sec. 6.3.1.3
    186     and Sec. 7.3.1.3 in [3GPP38211]_.
    187 
    188     This layer must be associated to a :class:`~sionna.nr.LayerMapper` and
    189     performs the inverse operation.
    190 
    191     It is assumed that ``num_bits_per_symbol`` consecutive LLRs belong to
    192     a single symbol position. This allows to apply the LayerDemapper after
    193     demapping symbols to LLR values.
    194 
    195     If the layer mapper is configured for dual codeword transmission, a list of
    196     both transport block streams is returned.
    197 
    198     The class inherits from the Keras layer class and can be used as layer in a
    199     Keras model.
    200 
    201     Parameters
    202     ----------
    203         layer_mapper: :class:`~sionna.nr.LayerMapper`
    204             Associated LayerMapper.
    205 
    206         num_bits_per_symbol: int, 1 (default)
    207             Modulation order. Defines how many consecutive LLRs are associated
    208             to the same symbol position.
    209 
    210     Input
    211     -----
    212         inputs : [...,num_layers, n/num_layers], tf.float
    213             2+D tensor containing MIMO layer data sequences.
    214 
    215     Output
    216     ------
    217         : [...,n], or [[...,n1], [...,n2]], tf.float
    218             2+D tensor containing the sequence of bits after layer demapping.
    219             If ``num_codewords`` =2, a list of two transport blocks is returned.
    220 
    221     Note
    222     ----
    223     As it is more convenient to apply the layer demapper after demapping
    224     symbols to LLRs, this layer groups the input sequence into groups of
    225     ``num_bits_per_symbol`` LLRs before restoring the original symbol sequence.
    226     This behavior can be deactivated by setting ``num_bits_per_symbol`` =1.
    227     """
    228 
    229     def __init__(self,
    230                  layer_mapper,
    231                  num_bits_per_symbol=1,
    232                  **kwargs):
    233 
    234         super().__init__(**kwargs)
    235 
    236         assert isinstance(layer_mapper, LayerMapper), \
    237                     "layer_mapper must be LayerMapper."
    238         self._mapper = layer_mapper
    239 
    240         assert num_bits_per_symbol%1==0, \
    241                     "num_bits_per_symbol must be int."
    242         self._num_bits_per_symbol = num_bits_per_symbol
    243 
    244     def build(self, input_shapes):
    245         """Test input shapes for consistency."""
    246 
    247         # check that second last dimension equals number of expected streams
    248         num_layers = self._mapper.num_layers
    249         assert input_shapes.as_list()[-2]==num_layers, \
    250             "Invalid input dimension: input shape must be [...,num_layers,n]."
    251 
    252         assert input_shapes.as_list()[-1]%self._num_bits_per_symbol==0, \
    253             "Invalid input dimension: last dimension must be a multiple of " \
    254             "num_bits_per_symbol."
    255 
    256     def call(self, inputs):
    257         """Demaps multiple layers back to transport block stream(s)."""
    258 
    259         # group llrs into blocks of num_bits_per_symbol values
    260         s = inputs.shape[-1]
    261         x = split_dim(inputs,
    262                      (int(s/self._num_bits_per_symbol),
    263                       self._num_bits_per_symbol),
    264                      axis=len(inputs.shape)-1)
    265 
    266         # swap last dimensions
    267         x = tf.experimental.numpy.swapaxes(x, axis1=-2, axis2=-3)
    268 
    269         if self._mapper.num_codewords==1:
    270             y = flatten_last_dims(x, num_dims=3)
    271             return y
    272         else:
    273             # multiplex into two codewords/streams
    274             # only relevant for PDSCH with dual codeword transmission
    275 
    276             y0 = flatten_last_dims(x[...,:self._mapper.num_layers0,:],
    277                                    num_dims=3)
    278             y1 = flatten_last_dims(x[...,self._mapper.num_layers0:,:],
    279                                    num_dims=3)
    280             return [y0, y1]
    281