OpenCores
URL https://opencores.org/ocsvn/dmt_tx/dmt_tx/trunk

Subversion Repositories dmt_tx

Compare Revisions

  • This comparison shows the changes necessary to convert path
    /
    from Rev 29 to Rev 30
    Reverse comparison

Rev 29 → Rev 30

/dmt_tx/trunk/myhdl/test/test_bit_order.py
0,0 → 1,283
#!/usr/bin/env python
 
import unittest
import os
from myhdl import *
 
import random
 
 
if __name__ == '__main__':
import sys
sys.path.append('../')
 
from rtl.bit_order import bit_order
 
 
class TestBitOrder(unittest.TestCase):
 
def test_bit_order_simple(self):
 
def bench(fp_dataL, ip_dataL, fip_bitL, expOutL):
'''Verify the proper function of the bit ordering block.
 
Function gets passed the input data for the fast and the
inteleaved path, a list with bits to load from the fast and/or
interleaved path and the expected output word.
'''
 
DWIDTH = 8
# setup signals
clk, reset, \
fast_data_avail_i, fast_rden_o, \
inter_data_avail_i, inter_rden_o, \
ready_o, enable_i, \
data_out_valid_o \
= [Signal(bool(0)) for i in range(9)]
fast_data_i, inter_data_i = [Signal(intbv(0)[DWIDTH:]) for i in range(2)]
 
fast_bits_i, inter_bits_i = [Signal(intbv(0)[4:]) for i in range(2)]
data_out_o = Signal(intbv(0)[15:])
 
#
# instantiate DUT
#
bit_order_inst = bit_order(clk, reset,
fast_data_avail_i, fast_rden_o, fast_data_i,
inter_data_avail_i, inter_rden_o, inter_data_i,
ready_o, enable_i, fast_bits_i, inter_bits_i,
data_out_valid_o, data_out_o)
 
@always(delay(10))
def clkgen():
clk.next = not clk
 
@instance
def sendInputData():
 
yield reset.negedge
#print "Got the reset.negedge at %d"%now()
yield join( fifo(clk, fast_data_avail_i, fast_rden_o, fast_data_i, fp_dataL),
fifo(clk, inter_data_avail_i, inter_rden_o, inter_data_i, ip_dataL)
)
 
@instance
def ctrlBitOrdering():
# apply reset
yield clk.negedge
reset.next = 1
yield clk.negedge
reset.next = 0
 
yield applyBitValues(clk, ready_o, enable_i,
fast_bits_i, inter_bits_i,
fip_bitL)
 
@instance
def verify():
 
yield verifyOutput(clk, data_out_valid_o, data_out_o, expOutL)
 
raise StopSimulation
 
return instances()
 
#####################################
 
# generate test data
fp_dataL, ip_dataL, fip_bitL = genData()
# calculate expected output
expOutL = calcExpectedOutput(fp_dataL, ip_dataL, fip_bitL)
 
 
tb = bench(fp_dataL, ip_dataL, fip_bitL, expOutL)
#tb = traceSignals(bench, fp_dataL, ip_dataL, fip_bitL, expOutL)
sim = Simulation(tb)
sim.run()
 
##########################################################################
# bus functional models
def verifyOutput(clk, data_out_valid_o, data_out_o, expOutL,
TIMEOUT=10):
'''Bus function model for the bit order block output
 
The expected output is provided with expOutL as a list of integers.
The function will wait for an active data_out_valid_o signal and
compare the output with a sample of the list.
'''
#print "Expected values: ", expOutL
 
for exp_value in expOutL:
to_ctr = 0
while data_out_valid_o == 0:
yield clk.negedge
to_ctr += 1
if to_ctr > TIMEOUT:
raise StopSimulation, "verifyOutput timeout ERROR"
 
#print "data_out_valid_o: %d at %d"%(data_out_valid_o, now())
#print "data_out_o: 0x%x expected: 0x%x"%(data_out_o, exp_value)
 
yield data_out_valid_o.negedge
 
 
def applyBitValues(clk, ready_o, enable_i, fast_bits_i, inter_bits_i, fip_bitL):
'''BFM for setting the fast and interleaved bits and enabling
processing
 
Waits on an active ready_o and applies the fast_bits_i and
inter_bits_i data. Then sets enable_i active. Feeds all data from
fip_bitL to the module this way.
'''
#print "Bit list: ", fip_bitL
for fbits, ibits in fip_bitL:
 
# wait until ready is active
while ready_o == 0:
yield clk.negedge
 
fast_bits_i.next = fbits
inter_bits_i.next = ibits
enable_i.next = 1
#print "applying %d, %d at %d"%(fbits, ibits, now())
 
yield clk.negedge
enable_i.next = 0
 
 
def fifo(clk, data_avail_i, rden_o, data_i, dataL):
'''BFM for sending the fast and interleaved data over the FIFO
interface
 
Waits on a request from the bit order block and sends a data word out.
'''
data_avail_i.next = 1
#print "FIFO data: ", dataL
 
for data in dataL:
data_i.next = data
while rden_o == 0:
yield clk.posedge
 
 
##########################################################################
# functions for data generation and calculating expected output
def genData():
'''Generate test data for the bit order block verification
Return fp_dataL, ip_dataL, fip_bitL
 
fp_dataL : contains the data for the fast path. One byte per
entry. Number of bytes (sum of data bits) matches
with the sum of bits in fip_bitL for the fast path
ip_dataL : contains the data for the interleaved path. One
byte per entry. Number of bytes (sum of data bits)
matches with the sum of bits in fip_bitL for the
interleaved path
fip_bitL : list of tuples, with tuples of size 2. Index 0 is
the number of bits from the fast path and index 1
is the number of bits from the interlaved path.
'''
 
#fp_bitL = [i for i in range(2,16)]
fp_bitL = [2,8]
#random.shuffle(fp_bitL)
bit_sum = sum(fp_bitL)
rem_bit_sum = bit_sum % 8
# there is no carrier load of 1 bit,
# so in case we need to add 1, add 9
if rem_bit_sum == 1:
fp_bitL.append(9)
elif rem_bit_sum > 1:
fp_bitL.append(8-rem_bit_sum)
#print "fp_bitL: ", fp_bitL
 
ip_bitL = [0]*len(fp_bitL)
 
fip_bitL = []
for i, fp_bits in enumerate(fp_bitL):
fip_bitL.append((fp_bits, ip_bitL[i]))
 
fp_bit_sum = sum(fp_bitL)
ip_bit_sum = sum(ip_bitL)
 
# finished setup the bit list tables
 
fp_dataL = []
ip_dataL = []
byte_num = fp_bit_sum / 8
for i in range(byte_num):
value = random.randrange(256)
#fp_dataL.append(value)
fp_dataL.append(16+ i%256)
#print i, i%256
 
return fp_dataL, ip_dataL, fip_bitL
 
 
def calcExpectedOutput(fp_dataL, ip_dataL, fip_bitL):
'''Calculate the expected output of the bit order block
Return a list with the expected out data words
'''
expOutL = []
 
# construct a list of bool values from the data. Data for each path
# come in as one byte per list entry
fpL = []
for byte in fp_dataL:
fpL.extend(byteToBoolList(byte))
 
ipL = []
for byte in ip_dataL:
ipL.extend(byteToBoolList(byte))
 
 
# reverse the lists, so that the lsb is index -1, as the pop() gets
fpL.reverse()
ipL.reverse()
# the data from the end of the list
for fp_bits, ip_bits in fip_bitL:
data_out = 0
# get first the bits from the fast path
if fpL:
for i in range(fp_bits):
bit = fpL.pop()
data_out |= bit << i
# now go on with the interleaved path
if ipL:
for i in range(ip_bits):
bit = ipL.pop()
data_out |= bit << i
 
expOutL.append(data_out)
 
 
return expOutL
 
 
 
def byteToBoolList(byte):
'''Convert an integer byte value to a list of bool types
index 0 of the list is lsb
'''
retL = []
for i in range(8):
bit = (byte >> i) & 0x1
if bit:
retL.append(1)
else:
retL.append(0)
 
return retL
 
########################################################################
# main
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestBitOrder)
unittest.TextTestRunner(verbosity=2).run(suite)
dmt_tx/trunk/myhdl/test/test_bit_order.py Property changes : Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: dmt_tx/trunk/myhdl/test/test_fifo_sync.py =================================================================== --- dmt_tx/trunk/myhdl/test/test_fifo_sync.py (nonexistent) +++ dmt_tx/trunk/myhdl/test/test_fifo_sync.py (revision 30) @@ -0,0 +1,152 @@ +#!/usr/bin/env python + +import unittest +import os +from myhdl import * + +if __name__ == '__main__': + import sys + sys.path.append('../') + +from rtl.fifo_sync import fifo_sync + + +class TestFifo(unittest.TestCase): + + def test_fifo_simple(self): + + def bench(): + + DWIDTH = 4 + SIZE = 4 + + clk, reset, full_o, wen_i, data_avail_o, rden_i = \ + [Signal(bool(0)) for i in range(6)] + + data_i, data_o = [Signal(intbv(0)[DWIDTH:]) for i in range(2)] + + fifo_inst = fifo_sync( clk, reset, + full_o, wen_i, data_i, + data_avail_o, rden_i, data_o, + DWIDTH,SIZE) + + @always(delay(10)) + def clkgen(): + clk.next = not clk + + @instance + def stim_and_check(): + reset.next = 1 + yield clk.negedge + reset.next = 0 + yield clk.negedge + + self.assertEqual(full_o, 0) + self.assertEqual(data_avail_o, 0) + + # write fifo full + yield clk.negedge + for i in range(SIZE+1): + + data_i.next = i + wen_i.next = 1 + yield clk.negedge + + if i < SIZE-1: + msg = "full (0): %d at %d"%(full_o, now()) + self.assertEqual(full_o, 0, msg) + else: + msg = "full (1): %d at %d"%(full_o, now()) + self.assertEqual(full_o, 1, msg) + + wen_i.next = 0 + + # read fifo back empty + yield clk.negedge + rden_i.next = 1 + + yield clk.posedge + self.assertEqual(full_o, 1) + + yield delay(1) + self.assertEqual(full_o, 0) + + for i in range(SIZE+1): + yield clk.negedge + + if i < SIZE-1: + msg = "i: %d data_o: %d at %d"%(i, data_o, now()) + self.assertEqual(data_o, i, msg) + elif i == SIZE-1: + msg = "i: %d data_o: %d at %d"%(i, data_o, now()) + self.assertEqual(data_o, SIZE-1, msg) + self.assertEqual(data_avail_o, 0) + + rden_i.next = 0 + yield clk.negedge + + # + # fill 2 words, read/write 3 words, should not get full + # + + # 1st data + data_i.next = 1 + wen_i.next = 1 + yield clk.negedge + + self.assertEqual(data_avail_o, 1) + + # 2nd data + data_i.next = 2 + yield clk.negedge + + # write and read + rden_i.next = 1 + for i in range(3): + data_i.next = 11 + i + yield clk.negedge + + self.assertEqual(full_o, 0) + self.assertEqual(data_avail_o, 1) + + if i == 0: + self.assertEqual(data_o, 1) + elif i == 1: + self.assertEqual(data_o, 2) + elif i == 2: + self.assertEqual(data_o, 11) + + wen_i.next = 0 + + yield clk.negedge + self.assertEqual(full_o, 0) + self.assertEqual(data_avail_o, 1) + self.assertEqual(data_o, 12) + + yield clk.negedge + self.assertEqual(full_o, 0) + self.assertEqual(data_avail_o, 0) + self.assertEqual(data_o, 13) + + + yield clk.negedge + self.assertEqual(full_o, 0) + self.assertEqual(data_avail_o, 0) + self.assertEqual(data_o, 13) + + rden_i.next = 0 + + raise StopSimulation + + return instances() + + tb = bench() + #tb = traceSignals(bench) + sim = Simulation(tb) + sim.run() + +######################################################################## +# main +if __name__ == '__main__': + suite = unittest.TestLoader().loadTestsFromTestCase(TestFifo) + unittest.TextTestRunner(verbosity=2).run(suite)
dmt_tx/trunk/myhdl/test/test_fifo_sync.py Property changes : Added: svn:executable ## -0,0 +1 ## +* \ No newline at end of property Index: dmt_tx/trunk/myhdl/test/test_const_encoder.py =================================================================== --- dmt_tx/trunk/myhdl/test/test_const_encoder.py (revision 29) +++ dmt_tx/trunk/myhdl/test/test_const_encoder.py (revision 30) @@ -46,8 +46,14 @@ yield clk.negedge reset.next = 0 + + # quick test + sizeL = [3, 4, 5, 6, 7, 8, 9, 10, 11] - for size in [4, 5]: + # full test + #sizeL = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + + for size in sizeL: #print #print 'Testing const_size %d'%size #print
/dmt_tx/trunk/myhdl/rtl/bit_order.py
0,0 → 1,221
 
 
from myhdl import *
 
 
def bit_order(clk, reset,
fast_data_avail_i, fast_rden_o, fast_data_i,
inter_data_avail_i, inter_rden_o, inter_data_i,
ready_o, enable_i, fast_bits_i, inter_bits_i,
data_out_valid_o, data_out_o,
DWIDTH=15):
'''Bit order block
 
I/O pins:
=========
clk : clock input
reset : reset input
 
fast_data_avail_i : signal that there are fast data available
fast_rden_o : read pulse for the fast data input
fast_data_i : fast data input. Data come from the FIFO and are 8
bit wide
 
inter_data_avail_i : signal that there are interleaved data available
inter_rden_o : read pulse for the interleaved data input
inter_data_i : interleaved data input. Data come from the FIFO
and are 8 bit wide
 
ready_o : signal that with enable_i a new processing cycle
can start
enable_i : pulse to enable the processing for the set input
data
fast_bits_i : number of bits to be extracted from the fast data
path
inter_bits_i : number of bits to be extracted from the
interleaved data path
 
data_out_valid_o : signal that after an enable_i pulse the data_out_o
value is valid
data_out_o : DWIDTH bit wide output data
 
 
parameters:
===========
 
DWIDTH : output data width. Resembles the maximum number
of bits to be loaded per carrier
'''
#
# signal setup
 
t_state = enum('IDLE', 'FP_LOAD', 'IP_LOAD', 'FIP_SHIFT', 'DOUT')
 
state = Signal(t_state.IDLE)
 
SWIDTH = (DWIDTH-1-8) + 2*8 # shift register width
 
fp_shift_reg, ip_shift_reg \
= [Signal(intbv(0)[SWIDTH:]) for i in range(2)]
fp_process, ip_process, fp_needs_load, ip_needs_load \
= [Signal(bool(0)) for i in range(4)]
 
fp_shift_load, ip_shift_load \
= [Signal(int(0)) for i in range(2)]
 
 
#
# logic
 
@always_comb
def comb_logic_ready():
if state == t_state.IDLE or data_out_valid_o:
ready_o.next = 1
else:
ready_o.next = 0
 
@always_comb
def comb_logic():
fp_needs_load.next = fast_bits_i > fp_shift_load
ip_needs_load.next = inter_bits_i > ip_shift_load
 
fp_process.next = fast_bits_i > 0
ip_process.next = inter_bits_i > 0
 
 
@always(clk.posedge, reset.posedge)
def fsm():
if reset:
state.next = t_state.IDLE
else:
if state == t_state.IDLE:
 
if enable_i and fp_process and fp_needs_load:
state.next = t_state.FP_LOAD
elif enable_i and not fp_process and ip_process and ip_needs_load:
state.next = t_state.IP_LOAD
elif enable_i and not fp_process and ip_process and not ip_needs_load:
state.next = t_state.FIP_SHIFT
 
elif state == t_state.FP_LOAD:
 
if not fp_needs_load and ip_process and ip_needs_load:
state.next = t_state.IP_LOAD
elif not fp_needs_load and not ip_process:
state.next = t_state.FIP_SHIFT
 
elif state == t_state.IP_LOAD:
if not ip_needs_load:
state.next = t_state.IP_SHIFT
 
elif state == t_state.FIP_SHIFT:
 
state.next = t_state.DOUT
 
elif state == t_state.DOUT:
 
if data_out_valid_o and not enable_i:
state.next = t_state.IDLE
elif data_out_valid_o and enable_i \
and fp_process and fp_needs_load:
state.next = t_state.FP_LOAD
elif data_out_valid_o and enable_i \
and not fp_process and ip_process and ip_needs_load:
state.next = t_state.IP_LOAD
elif data_out_valid_o and enable_i \
and fp_process or ip_process:
state.next = t_state.FIP_SHIFT
 
 
 
#
# fast path load processing
@always(clk.posedge, reset.negedge)
def fp_load():
 
if reset:
fast_rden_o.next = 0
fp_shift_load.next = 0
fp_shift_reg.next = 0
 
else:
if state == t_state.FP_LOAD:
if fp_needs_load and fast_data_avail_i and not fast_rden_o:
fast_rden_o.next = 1
elif fast_rden_o:
fast_rden_o.next = 0
fp_shift_load.next = fp_shift_load + 8
 
if fp_shift_load > 0:
fp_shift_reg.next = concat(fast_data_i, fp_shift_reg[fp_shift_load:])
else:
fp_shift_reg.next = fast_data_i
elif state == t_state.FIP_SHIFT:
fp_shift_load.next = fp_shift_load - fast_bits_i
 
 
#
# interleaved path load processing
@always(clk.posedge, reset.negedge)
def ip_load():
 
if reset:
inter_rden_o.next = 0
ip_shift_load.next = 0
ip_shift_reg.next = 0
 
else:
if state == t_state.IP_LOAD:
if inter_data_avail_i and not inter_rden_o:
inter_rden_o.next = 1
elif inter_rden_o:
inter_rden_o.next = 0
ip_shift_load.next = ip_shift_load + 8
if ip_shift_load > 0:
ip_shift_reg.next = concat(inter_data_i, ip_shift_reg[ip_shift_load:])
else:
ip_shift_reg.next = inter_data_i
 
elif state == t_state.FIP_SHIFT:
ip_shift_load.next = ip_shift_load - inter_bits_i
 
 
#
# fast and interleaved path shift processing
@always(clk.posedge, reset.negedge)
def fp_ip_shift():
 
if reset:
data_out_o.next = 0
data_out_valid_o.next = 0
 
else:
 
if state == t_state.FIP_SHIFT:
if fp_process and not ip_process:
data_out_o.next = fp_shift_reg[fast_bits_i:]
fp_shift_reg.next = fp_shift_reg[:fast_bits_i]
 
elif not fp_process and ip_process:
data_out_o.next = ip_shift_reg[inter_bits_i:]
ip_shift_reg.next = ip_shift_reg[:inter_bits_i]
 
else:
data_out_o.next = concat( ip_shift_reg[inter_bits_i:],
fp_shift_reg[fast_bits_i:])
fp_shift_reg.next = fp_shift_reg[:fast_bits_i]
ip_shift_reg.next = ip_shift_reg[:inter_bits_i]
 
data_out_valid_o.next = 1
 
else:
data_out_valid_o.next = 0
 
return instances()
/dmt_tx/trunk/myhdl/rtl/fifo_sync.py
0,0 → 1,100
 
from myhdl import *
 
 
 
def fifo_sync(clk, reset,
full_o, wen_i, data_i,
data_avail_o, rden_i, data_o,
DWIDTH=8, SIZE=8):
'''FIFO with clock synchronous read and write
 
I/O pins:
=========
clk : clock signal
reset : reset will empty the memory and reset full_o and
data_avail_o signals
full_o : signal that the FIFO is full. No further input data
are taken.
wen_i : write enable for the data_i data
data_i : input data with data width as specified by DWIDTH
 
data_avail_o : signal that there are data in the FIFO available for
reading out
rden_i : read enable for the data_o signal
data_o : output data. Only valid if the data_avail_o signal is
active
 
parameters:
===========
 
DWIDTH : data width
SIZE : size of the FIFO
'''
 
mem = [Signal(intbv(0)[DWIDTH:]) for i in range(SIZE)]
wp, rp = [Signal(int(0)) for i in range(2)]
fill_ctr = Signal(intbv(0, min=0, max=SIZE+1))
 
@always(clk.posedge, reset.posedge)
def write_data():
if reset:
data_o.next = 0
rp.next = 0
 
else:
if wen_i and not full_o:
mem[wp.val].next = data_i
if wp < SIZE-1:
wp.next = wp + 1
else:
wp.next = 0
 
@always(clk.posedge, reset.posedge)
def read_data():
 
if reset:
data_o.next = 0
rp.next = 0
 
else:
if rden_i and data_avail_o:
data_o.next = mem[rp.val]
if rp < SIZE-1:
rp.next = rp + 1
else:
rp.next = 0
 
 
@always(clk.posedge, reset.posedge)
def count_load():
 
if reset:
fill_ctr.next = 0
else:
 
if (rden_i and data_avail_o) and not (wen_i and not full_o) \
and (fill_ctr > 0):
 
fill_ctr.next = fill_ctr - 1
 
elif (wen_i and not full_o) and not (rden_i and data_avail_o) \
and (fill_ctr < SIZE):
 
fill_ctr.next = fill_ctr + 1
 
 
@always_comb
def comb():
if fill_ctr == SIZE:
full_o.next = 1
else:
full_o.next = 0
 
if fill_ctr > 0:
data_avail_o.next = 1
else:
data_avail_o.next = 0
 
return instances()
/dmt_tx/trunk/myhdl/adsl_main.py
12,11 → 12,17
import unittest
 
 
import test.test_flipSign, test.test_cmath, test.test_const_encoder
import test.test_flipSign, \
test.test_cmath, \
test.test_const_encoder, \
test.test_fifo_sync, \
test.test_bit_order
 
mL = [test.test_flipSign]
mL.append(test.test_cmath)
mL.append(test.test_const_encoder)
mL.append(test.test_fifo_sync)
mL.append(test.test_bit_order)
 
tl = unittest.defaultTestLoader
def suite():

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.