-
Notifications
You must be signed in to change notification settings - Fork 0
/
rx_data.py
83 lines (69 loc) · 3.2 KB
/
rx_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""AI based RX
name: RX data generation module
status: initial,
'data/data_{iq}/{iq}_t{tau:.1f}_{s}{snr}.csv'
v0.0.2 >
last update : (16 May 2024, 21:30)
Naming: Modulation_TAU_SNR (fixed GroupDelay as 4, FS = 10)
"""
import os
import numpy as np
import pandas as pd
from ber_util import gen_data, add_awgn
from rx_utils import get_data, show_train, check_data, prep_ts_data, get_song_data
from rx_config import init_gpu
from constants import FS, G_DELAY, dict_h, snr_to_nos
# init_gpu()
# Modulation Type
IQ = 'bpsk' # bpsk, qpsk
# TAU Value
TAU = [0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # 0.50, 0.60, 0.70, 0.80, 0.90, 1.00
# SNR Level
SNR = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 'NoNoise'] # 'NoNoise' 0, 1, 2, ..., 10, NoNoise # noqa
# FUTURE support for variable sampling frequency, FS
assert FS == 10, "Only FS=10 supported, current implementation does not support 'FS: {fs}'".format(fs=FS)
# G_DELAY FS based h generation
hPSF = np.array(dict_h[G_DELAY]).astype(np.float16)
assert np.array_equal(hPSF, hPSF[::-1]), 'symmetry mismatch!'
# [SOURCE] Data Generation
# TODO: add other modulation type [Only bpsk supported]
for tau in TAU: # DEBUG [TAU[i]]
step = int(tau * FS)
for snr in SNR:
NoS = max(snr_to_nos[snr], 10**6)
data_filename = 'data/data_{iq}/{iq}_t{tau:.1f}_{s}{snr}.csv'.format(iq=IQ,
tau=tau,
s='s' if isinstance(snr, int) else '',
snr=snr)
if os.path.exists(data_filename):
print('[Skip]\t{file} already exist.'.format(file=data_filename))
continue
print("generating {nod} for {name}..".format(nod=NoS, name=data_filename))
data, bits = gen_data(n=NoS, mod=IQ, seed=43523) # IQ options: ('bpsk', 'qpsk')
# [TX] up-sample
# extend the data by up sampling (in order to be able to apply FTN)
s_up_sampled = np.zeros(step * len(data), dtype=np.float16)
s_up_sampled[::step] = data
# [TX] apply FTN (tau)
# apply the filter
tx_data = np.convolve(hPSF, s_up_sampled)
# [CHANNEL] add AWGN noise (snr)
# Channel Modelling, add noise
rch = add_awgn(inputs=tx_data, snr=snr, seed=1234)
# [RX] apply matched filter
mf = np.convolve(hPSF, rch)
# [RX] down-sample (subsample)
# p_loc = 2 * G_DELAY * FS # 81 for g_delay=4 and FS = 10,
# 4*10=40 from first conv@TX, and +40 from last conv@RX
# remove additional prefix and suffix symbols due to CONV
rx_data = mf[2 * G_DELAY * FS:-(2 * G_DELAY * FS):step]
# X_i, y_i
X_i = rx_data
y_i = bits
# # if AUTO_SAVE:
# np.save('data/snr{snr}_{iq}_tau{tau:.1f}_X_i.npy'.format(snr=SNR, iq=IQ, tau=TAU), X_i)
# np.save('data/snr{snr}_{iq}_tau{tau:.1f}_y_i.npy'.format(snr=SNR, iq=IQ, tau=TAU), y_i)
# save to csv
df = pd.DataFrame.from_dict({'y': y_i, 'X': X_i})
df.to_csv(data_filename, index=False)
# df.to_csv('data/tau0.60_snrNoNoise_bpsk.csv', index=False)