URL
https://opencores.org/ocsvn/dmt_tx/dmt_tx/trunk
Subversion Repositories dmt_tx
Compare Revisions
- This comparison shows the changes necessary to convert path
/dmt_tx
- from Rev 29 to Rev 30
- ↔ Reverse comparison
Rev 29 → Rev 30
/trunk/myhdl/test/test_bit_order.py
0,0 → 1,283
#!/usr/bin/env python |
|
import unittest |
import os |
from myhdl import * |
|
import random |
|
|
if __name__ == '__main__': |
import sys |
sys.path.append('../') |
|
from rtl.bit_order import bit_order |
|
|
class TestBitOrder(unittest.TestCase): |
|
def test_bit_order_simple(self): |
|
def bench(fp_dataL, ip_dataL, fip_bitL, expOutL): |
'''Verify the proper function of the bit ordering block. |
|
Function gets passed the input data for the fast and the |
inteleaved path, a list with bits to load from the fast and/or |
interleaved path and the expected output word. |
''' |
|
DWIDTH = 8 |
|
# setup signals |
clk, reset, \ |
fast_data_avail_i, fast_rden_o, \ |
inter_data_avail_i, inter_rden_o, \ |
ready_o, enable_i, \ |
data_out_valid_o \ |
= [Signal(bool(0)) for i in range(9)] |
|
fast_data_i, inter_data_i = [Signal(intbv(0)[DWIDTH:]) for i in range(2)] |
|
fast_bits_i, inter_bits_i = [Signal(intbv(0)[4:]) for i in range(2)] |
data_out_o = Signal(intbv(0)[15:]) |
|
# |
# instantiate DUT |
# |
bit_order_inst = bit_order(clk, reset, |
fast_data_avail_i, fast_rden_o, fast_data_i, |
inter_data_avail_i, inter_rden_o, inter_data_i, |
ready_o, enable_i, fast_bits_i, inter_bits_i, |
data_out_valid_o, data_out_o) |
|
@always(delay(10)) |
def clkgen(): |
clk.next = not clk |
|
@instance |
def sendInputData(): |
|
yield reset.negedge |
#print "Got the reset.negedge at %d"%now() |
yield join( fifo(clk, fast_data_avail_i, fast_rden_o, fast_data_i, fp_dataL), |
fifo(clk, inter_data_avail_i, inter_rden_o, inter_data_i, ip_dataL) |
) |
|
@instance |
def ctrlBitOrdering(): |
|
# apply reset |
yield clk.negedge |
reset.next = 1 |
yield clk.negedge |
reset.next = 0 |
|
yield applyBitValues(clk, ready_o, enable_i, |
fast_bits_i, inter_bits_i, |
fip_bitL) |
|
|
@instance |
def verify(): |
|
yield verifyOutput(clk, data_out_valid_o, data_out_o, expOutL) |
|
raise StopSimulation |
|
return instances() |
|
##################################### |
|
# generate test data |
fp_dataL, ip_dataL, fip_bitL = genData() |
|
# calculate expected output |
expOutL = calcExpectedOutput(fp_dataL, ip_dataL, fip_bitL) |
|
|
tb = bench(fp_dataL, ip_dataL, fip_bitL, expOutL) |
#tb = traceSignals(bench, fp_dataL, ip_dataL, fip_bitL, expOutL) |
sim = Simulation(tb) |
sim.run() |
|
########################################################################## |
# bus functional models |
def verifyOutput(clk, data_out_valid_o, data_out_o, expOutL, |
TIMEOUT=10): |
'''Bus function model for the bit order block output |
|
The expected output is provided with expOutL as a list of integers. |
The function will wait for an active data_out_valid_o signal and |
compare the output with a sample of the list. |
''' |
#print "Expected values: ", expOutL |
|
for exp_value in expOutL: |
to_ctr = 0 |
while data_out_valid_o == 0: |
yield clk.negedge |
to_ctr += 1 |
if to_ctr > TIMEOUT: |
raise StopSimulation, "verifyOutput timeout ERROR" |
|
#print "data_out_valid_o: %d at %d"%(data_out_valid_o, now()) |
#print "data_out_o: 0x%x expected: 0x%x"%(data_out_o, exp_value) |
|
yield data_out_valid_o.negedge |
|
|
def applyBitValues(clk, ready_o, enable_i, fast_bits_i, inter_bits_i, fip_bitL): |
'''BFM for setting the fast and interleaved bits and enabling |
processing |
|
Waits on an active ready_o and applies the fast_bits_i and |
inter_bits_i data. Then sets enable_i active. Feeds all data from |
fip_bitL to the module this way. |
''' |
#print "Bit list: ", fip_bitL |
for fbits, ibits in fip_bitL: |
|
# wait until ready is active |
while ready_o == 0: |
yield clk.negedge |
|
fast_bits_i.next = fbits |
inter_bits_i.next = ibits |
enable_i.next = 1 |
#print "applying %d, %d at %d"%(fbits, ibits, now()) |
|
yield clk.negedge |
enable_i.next = 0 |
|
|
def fifo(clk, data_avail_i, rden_o, data_i, dataL): |
'''BFM for sending the fast and interleaved data over the FIFO |
interface |
|
Waits on a request from the bit order block and sends a data word out. |
''' |
data_avail_i.next = 1 |
#print "FIFO data: ", dataL |
|
for data in dataL: |
data_i.next = data |
while rden_o == 0: |
yield clk.posedge |
|
|
########################################################################## |
# functions for data generation and calculating expected output |
def genData(): |
'''Generate test data for the bit order block verification |
Return fp_dataL, ip_dataL, fip_bitL |
|
fp_dataL : contains the data for the fast path. One byte per |
entry. Number of bytes (sum of data bits) matches |
with the sum of bits in fip_bitL for the fast path |
ip_dataL : contains the data for the interleaved path. One |
byte per entry. Number of bytes (sum of data bits) |
matches with the sum of bits in fip_bitL for the |
interleaved path |
fip_bitL : list of tuples, with tuples of size 2. Index 0 is |
the number of bits from the fast path and index 1 |
is the number of bits from the interlaved path. |
''' |
|
#fp_bitL = [i for i in range(2,16)] |
fp_bitL = [2,8] |
#random.shuffle(fp_bitL) |
bit_sum = sum(fp_bitL) |
rem_bit_sum = bit_sum % 8 |
# there is no carrier load of 1 bit, |
# so in case we need to add 1, add 9 |
if rem_bit_sum == 1: |
fp_bitL.append(9) |
elif rem_bit_sum > 1: |
fp_bitL.append(8-rem_bit_sum) |
|
#print "fp_bitL: ", fp_bitL |
|
ip_bitL = [0]*len(fp_bitL) |
|
fip_bitL = [] |
for i, fp_bits in enumerate(fp_bitL): |
fip_bitL.append((fp_bits, ip_bitL[i])) |
|
fp_bit_sum = sum(fp_bitL) |
ip_bit_sum = sum(ip_bitL) |
|
# finished setup the bit list tables |
|
fp_dataL = [] |
ip_dataL = [] |
byte_num = fp_bit_sum / 8 |
for i in range(byte_num): |
value = random.randrange(256) |
#fp_dataL.append(value) |
fp_dataL.append(16+ i%256) |
#print i, i%256 |
|
return fp_dataL, ip_dataL, fip_bitL |
|
|
def calcExpectedOutput(fp_dataL, ip_dataL, fip_bitL): |
'''Calculate the expected output of the bit order block |
Return a list with the expected out data words |
''' |
expOutL = [] |
|
# construct a list of bool values from the data. Data for each path |
# come in as one byte per list entry |
fpL = [] |
for byte in fp_dataL: |
fpL.extend(byteToBoolList(byte)) |
|
ipL = [] |
for byte in ip_dataL: |
ipL.extend(byteToBoolList(byte)) |
|
|
# reverse the lists, so that the lsb is index -1, as the pop() gets |
fpL.reverse() |
ipL.reverse() |
# the data from the end of the list |
for fp_bits, ip_bits in fip_bitL: |
data_out = 0 |
# get first the bits from the fast path |
if fpL: |
for i in range(fp_bits): |
bit = fpL.pop() |
data_out |= bit << i |
|
# now go on with the interleaved path |
if ipL: |
for i in range(ip_bits): |
bit = ipL.pop() |
data_out |= bit << i |
|
expOutL.append(data_out) |
|
|
return expOutL |
|
|
|
def byteToBoolList(byte): |
'''Convert an integer byte value to a list of bool types |
index 0 of the list is lsb |
''' |
retL = [] |
for i in range(8): |
bit = (byte >> i) & 0x1 |
if bit: |
retL.append(1) |
else: |
retL.append(0) |
|
return retL |
|
######################################################################## |
# main |
if __name__ == '__main__': |
suite = unittest.TestLoader().loadTestsFromTestCase(TestBitOrder) |
unittest.TextTestRunner(verbosity=2).run(suite) |
trunk/myhdl/test/test_bit_order.py
Property changes :
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: trunk/myhdl/test/test_fifo_sync.py
===================================================================
--- trunk/myhdl/test/test_fifo_sync.py (nonexistent)
+++ trunk/myhdl/test/test_fifo_sync.py (revision 30)
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+
+import unittest
+import os
+from myhdl import *
+
+if __name__ == '__main__':
+ import sys
+ sys.path.append('../')
+
+from rtl.fifo_sync import fifo_sync
+
+
+class TestFifo(unittest.TestCase):
+
+ def test_fifo_simple(self):
+
+ def bench():
+
+ DWIDTH = 4
+ SIZE = 4
+
+ clk, reset, full_o, wen_i, data_avail_o, rden_i = \
+ [Signal(bool(0)) for i in range(6)]
+
+ data_i, data_o = [Signal(intbv(0)[DWIDTH:]) for i in range(2)]
+
+ fifo_inst = fifo_sync( clk, reset,
+ full_o, wen_i, data_i,
+ data_avail_o, rden_i, data_o,
+ DWIDTH,SIZE)
+
+ @always(delay(10))
+ def clkgen():
+ clk.next = not clk
+
+ @instance
+ def stim_and_check():
+ reset.next = 1
+ yield clk.negedge
+ reset.next = 0
+ yield clk.negedge
+
+ self.assertEqual(full_o, 0)
+ self.assertEqual(data_avail_o, 0)
+
+ # write fifo full
+ yield clk.negedge
+ for i in range(SIZE+1):
+
+ data_i.next = i
+ wen_i.next = 1
+ yield clk.negedge
+
+ if i < SIZE-1:
+ msg = "full (0): %d at %d"%(full_o, now())
+ self.assertEqual(full_o, 0, msg)
+ else:
+ msg = "full (1): %d at %d"%(full_o, now())
+ self.assertEqual(full_o, 1, msg)
+
+ wen_i.next = 0
+
+ # read fifo back empty
+ yield clk.negedge
+ rden_i.next = 1
+
+ yield clk.posedge
+ self.assertEqual(full_o, 1)
+
+ yield delay(1)
+ self.assertEqual(full_o, 0)
+
+ for i in range(SIZE+1):
+ yield clk.negedge
+
+ if i < SIZE-1:
+ msg = "i: %d data_o: %d at %d"%(i, data_o, now())
+ self.assertEqual(data_o, i, msg)
+ elif i == SIZE-1:
+ msg = "i: %d data_o: %d at %d"%(i, data_o, now())
+ self.assertEqual(data_o, SIZE-1, msg)
+ self.assertEqual(data_avail_o, 0)
+
+ rden_i.next = 0
+ yield clk.negedge
+
+ #
+ # fill 2 words, read/write 3 words, should not get full
+ #
+
+ # 1st data
+ data_i.next = 1
+ wen_i.next = 1
+ yield clk.negedge
+
+ self.assertEqual(data_avail_o, 1)
+
+ # 2nd data
+ data_i.next = 2
+ yield clk.negedge
+
+ # write and read
+ rden_i.next = 1
+ for i in range(3):
+ data_i.next = 11 + i
+ yield clk.negedge
+
+ self.assertEqual(full_o, 0)
+ self.assertEqual(data_avail_o, 1)
+
+ if i == 0:
+ self.assertEqual(data_o, 1)
+ elif i == 1:
+ self.assertEqual(data_o, 2)
+ elif i == 2:
+ self.assertEqual(data_o, 11)
+
+ wen_i.next = 0
+
+ yield clk.negedge
+ self.assertEqual(full_o, 0)
+ self.assertEqual(data_avail_o, 1)
+ self.assertEqual(data_o, 12)
+
+ yield clk.negedge
+ self.assertEqual(full_o, 0)
+ self.assertEqual(data_avail_o, 0)
+ self.assertEqual(data_o, 13)
+
+
+ yield clk.negedge
+ self.assertEqual(full_o, 0)
+ self.assertEqual(data_avail_o, 0)
+ self.assertEqual(data_o, 13)
+
+ rden_i.next = 0
+
+ raise StopSimulation
+
+ return instances()
+
+ tb = bench()
+ #tb = traceSignals(bench)
+ sim = Simulation(tb)
+ sim.run()
+
+########################################################################
+# main
+if __name__ == '__main__':
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestFifo)
+ unittest.TextTestRunner(verbosity=2).run(suite)
trunk/myhdl/test/test_fifo_sync.py
Property changes :
Added: svn:executable
## -0,0 +1 ##
+*
\ No newline at end of property
Index: trunk/myhdl/test/test_const_encoder.py
===================================================================
--- trunk/myhdl/test/test_const_encoder.py (revision 29)
+++ trunk/myhdl/test/test_const_encoder.py (revision 30)
@@ -46,8 +46,14 @@
yield clk.negedge
reset.next = 0
+
+ # quick test
+ sizeL = [3, 4, 5, 6, 7, 8, 9, 10, 11]
- for size in [4, 5]:
+ # full test
+ #sizeL = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
+
+ for size in sizeL:
#print
#print 'Testing const_size %d'%size
#print
/trunk/myhdl/rtl/bit_order.py
0,0 → 1,221
|
|
from myhdl import * |
|
|
def bit_order(clk, reset, |
fast_data_avail_i, fast_rden_o, fast_data_i, |
inter_data_avail_i, inter_rden_o, inter_data_i, |
ready_o, enable_i, fast_bits_i, inter_bits_i, |
data_out_valid_o, data_out_o, |
DWIDTH=15): |
'''Bit order block |
|
I/O pins: |
========= |
clk : clock input |
reset : reset input |
|
fast_data_avail_i : signal that there are fast data available |
fast_rden_o : read pulse for the fast data input |
fast_data_i : fast data input. Data come from the FIFO and are 8 |
bit wide |
|
inter_data_avail_i : signal that there are interleaved data available |
inter_rden_o : read pulse for the interleaved data input |
inter_data_i : interleaved data input. Data come from the FIFO |
and are 8 bit wide |
|
ready_o : signal that with enable_i a new processing cycle |
can start |
enable_i : pulse to enable the processing for the set input |
data |
fast_bits_i : number of bits to be extracted from the fast data |
path |
inter_bits_i : number of bits to be extracted from the |
interleaved data path |
|
data_out_valid_o : signal that after an enable_i pulse the data_out_o |
value is valid |
data_out_o : DWIDTH bit wide output data |
|
|
parameters: |
=========== |
|
DWIDTH : output data width. Resembles the maximum number |
of bits to be loaded per carrier |
''' |
|
# |
# signal setup |
|
t_state = enum('IDLE', 'FP_LOAD', 'IP_LOAD', 'FIP_SHIFT', 'DOUT') |
|
state = Signal(t_state.IDLE) |
|
SWIDTH = (DWIDTH-1-8) + 2*8 # shift register width |
|
fp_shift_reg, ip_shift_reg \ |
= [Signal(intbv(0)[SWIDTH:]) for i in range(2)] |
|
fp_process, ip_process, fp_needs_load, ip_needs_load \ |
= [Signal(bool(0)) for i in range(4)] |
|
fp_shift_load, ip_shift_load \ |
= [Signal(int(0)) for i in range(2)] |
|
|
# |
# logic |
|
@always_comb |
def comb_logic_ready(): |
if state == t_state.IDLE or data_out_valid_o: |
ready_o.next = 1 |
else: |
ready_o.next = 0 |
|
@always_comb |
def comb_logic(): |
fp_needs_load.next = fast_bits_i > fp_shift_load |
ip_needs_load.next = inter_bits_i > ip_shift_load |
|
fp_process.next = fast_bits_i > 0 |
ip_process.next = inter_bits_i > 0 |
|
|
@always(clk.posedge, reset.posedge) |
def fsm(): |
|
if reset: |
state.next = t_state.IDLE |
else: |
|
if state == t_state.IDLE: |
|
if enable_i and fp_process and fp_needs_load: |
state.next = t_state.FP_LOAD |
elif enable_i and not fp_process and ip_process and ip_needs_load: |
state.next = t_state.IP_LOAD |
elif enable_i and not fp_process and ip_process and not ip_needs_load: |
state.next = t_state.FIP_SHIFT |
|
elif state == t_state.FP_LOAD: |
|
if not fp_needs_load and ip_process and ip_needs_load: |
state.next = t_state.IP_LOAD |
elif not fp_needs_load and not ip_process: |
state.next = t_state.FIP_SHIFT |
|
elif state == t_state.IP_LOAD: |
|
if not ip_needs_load: |
state.next = t_state.IP_SHIFT |
|
elif state == t_state.FIP_SHIFT: |
|
state.next = t_state.DOUT |
|
elif state == t_state.DOUT: |
|
if data_out_valid_o and not enable_i: |
state.next = t_state.IDLE |
elif data_out_valid_o and enable_i \ |
and fp_process and fp_needs_load: |
state.next = t_state.FP_LOAD |
elif data_out_valid_o and enable_i \ |
and not fp_process and ip_process and ip_needs_load: |
state.next = t_state.IP_LOAD |
elif data_out_valid_o and enable_i \ |
and fp_process or ip_process: |
state.next = t_state.FIP_SHIFT |
|
|
|
# |
# fast path load processing |
@always(clk.posedge, reset.negedge) |
def fp_load(): |
|
if reset: |
fast_rden_o.next = 0 |
fp_shift_load.next = 0 |
fp_shift_reg.next = 0 |
|
else: |
if state == t_state.FP_LOAD: |
if fp_needs_load and fast_data_avail_i and not fast_rden_o: |
fast_rden_o.next = 1 |
elif fast_rden_o: |
fast_rden_o.next = 0 |
fp_shift_load.next = fp_shift_load + 8 |
|
if fp_shift_load > 0: |
fp_shift_reg.next = concat(fast_data_i, fp_shift_reg[fp_shift_load:]) |
else: |
fp_shift_reg.next = fast_data_i |
|
elif state == t_state.FIP_SHIFT: |
fp_shift_load.next = fp_shift_load - fast_bits_i |
|
|
# |
# interleaved path load processing |
@always(clk.posedge, reset.negedge) |
def ip_load(): |
|
if reset: |
inter_rden_o.next = 0 |
ip_shift_load.next = 0 |
ip_shift_reg.next = 0 |
|
else: |
if state == t_state.IP_LOAD: |
if inter_data_avail_i and not inter_rden_o: |
inter_rden_o.next = 1 |
elif inter_rden_o: |
inter_rden_o.next = 0 |
ip_shift_load.next = ip_shift_load + 8 |
if ip_shift_load > 0: |
ip_shift_reg.next = concat(inter_data_i, ip_shift_reg[ip_shift_load:]) |
else: |
ip_shift_reg.next = inter_data_i |
|
elif state == t_state.FIP_SHIFT: |
ip_shift_load.next = ip_shift_load - inter_bits_i |
|
|
# |
# fast and interleaved path shift processing |
@always(clk.posedge, reset.negedge) |
def fp_ip_shift(): |
|
if reset: |
data_out_o.next = 0 |
data_out_valid_o.next = 0 |
|
else: |
|
if state == t_state.FIP_SHIFT: |
if fp_process and not ip_process: |
data_out_o.next = fp_shift_reg[fast_bits_i:] |
fp_shift_reg.next = fp_shift_reg[:fast_bits_i] |
|
elif not fp_process and ip_process: |
data_out_o.next = ip_shift_reg[inter_bits_i:] |
ip_shift_reg.next = ip_shift_reg[:inter_bits_i] |
|
else: |
data_out_o.next = concat( ip_shift_reg[inter_bits_i:], |
fp_shift_reg[fast_bits_i:]) |
fp_shift_reg.next = fp_shift_reg[:fast_bits_i] |
ip_shift_reg.next = ip_shift_reg[:inter_bits_i] |
|
data_out_valid_o.next = 1 |
|
else: |
|
data_out_valid_o.next = 0 |
|
return instances() |
/trunk/myhdl/rtl/fifo_sync.py
0,0 → 1,100
|
from myhdl import * |
|
|
|
def fifo_sync(clk, reset, |
full_o, wen_i, data_i, |
data_avail_o, rden_i, data_o, |
DWIDTH=8, SIZE=8): |
'''FIFO with clock synchronous read and write |
|
I/O pins: |
========= |
clk : clock signal |
reset : reset will empty the memory and reset full_o and |
data_avail_o signals |
full_o : signal that the FIFO is full. No further input data |
are taken. |
wen_i : write enable for the data_i data |
data_i : input data with data width as specified by DWIDTH |
|
data_avail_o : signal that there are data in the FIFO available for |
reading out |
rden_i : read enable for the data_o signal |
data_o : output data. Only valid if the data_avail_o signal is |
active |
|
parameters: |
=========== |
|
DWIDTH : data width |
SIZE : size of the FIFO |
''' |
|
mem = [Signal(intbv(0)[DWIDTH:]) for i in range(SIZE)] |
wp, rp = [Signal(int(0)) for i in range(2)] |
fill_ctr = Signal(intbv(0, min=0, max=SIZE+1)) |
|
@always(clk.posedge, reset.posedge) |
def write_data(): |
|
if reset: |
data_o.next = 0 |
rp.next = 0 |
|
else: |
if wen_i and not full_o: |
mem[wp.val].next = data_i |
if wp < SIZE-1: |
wp.next = wp + 1 |
else: |
wp.next = 0 |
|
@always(clk.posedge, reset.posedge) |
def read_data(): |
|
if reset: |
data_o.next = 0 |
rp.next = 0 |
|
else: |
if rden_i and data_avail_o: |
data_o.next = mem[rp.val] |
if rp < SIZE-1: |
rp.next = rp + 1 |
else: |
rp.next = 0 |
|
|
@always(clk.posedge, reset.posedge) |
def count_load(): |
|
if reset: |
fill_ctr.next = 0 |
else: |
|
if (rden_i and data_avail_o) and not (wen_i and not full_o) \ |
and (fill_ctr > 0): |
|
fill_ctr.next = fill_ctr - 1 |
|
elif (wen_i and not full_o) and not (rden_i and data_avail_o) \ |
and (fill_ctr < SIZE): |
|
fill_ctr.next = fill_ctr + 1 |
|
|
@always_comb |
def comb(): |
if fill_ctr == SIZE: |
full_o.next = 1 |
else: |
full_o.next = 0 |
|
if fill_ctr > 0: |
data_avail_o.next = 1 |
else: |
data_avail_o.next = 0 |
|
return instances() |
/trunk/myhdl/adsl_main.py
12,11 → 12,17
import unittest |
|
|
import test.test_flipSign, test.test_cmath, test.test_const_encoder |
import test.test_flipSign, \ |
test.test_cmath, \ |
test.test_const_encoder, \ |
test.test_fifo_sync, \ |
test.test_bit_order |
|
mL = [test.test_flipSign] |
mL.append(test.test_cmath) |
mL.append(test.test_const_encoder) |
mL.append(test.test_fifo_sync) |
mL.append(test.test_bit_order) |
|
tl = unittest.defaultTestLoader |
def suite(): |