OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Rev 6

Compare with Previous | Blame | View Log

import unittest
import os
import zlib
import random
 
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
                  Cosimulation, block, instance, StopSimulation, modbv, \
                  always, always_seq, always_comb, enum, Error
 
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, \
                    CWINDOW, COMPRESS, DECOMPRESS, OBSIZE, LMAX, LIBSIZE
 
MAXW = CWINDOW
 
COSIMULATION = True
COSIMULATION = False
 
if not COSIMULATION:
    from deflate import deflate
else:
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                o_byte, i_waddr, i_raddr, clk, reset):
        print("Cosimulation")
        cmd = "iverilog -o deflate " + \
              "deflate.v " + \
              "tb_deflate.v "  # "dump.v "
        os.system(cmd)
        return Cosimulation("vvp -m ./myhdl deflate",
                            i_mode=i_mode, o_done=o_done,
                            i_data=i_data, o_iprogress=o_iprogress,
                            o_oprogress=o_oprogress,
                            o_byte=o_byte, i_waddr=i_waddr, i_raddr=i_raddr,
                            clk=clk, reset=reset)
 
 
def test_data(m, tlen=100, limit=False):
    print("MODE", m, tlen)
    if m == 0:
        str_data = " ".join(["Hello World! " + str(1) + " "
                             for i in range(tlen)])
        b_data = str_data.encode('utf-8')
    elif m == 1:
        str_data = " ".join(["   Hello World! " + str(i) + "     "
                             for i in range(tlen)])
        b_data = str_data.encode('utf-8')
    elif m == 2:
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
                             for i in range(tlen)])
        b_data = str_data.encode('utf-8')
    elif m == 3:
        b_data = bytes([random.randrange(0,0x100) for i in range(tlen)])
    elif m == 4:
        str_data = "".join([str(random.randrange(0,2))
                             for i in range(tlen)])
        b_data = str_data.encode('utf-8')
    else:
        raise Error("unknown test mode")
    # print(str_data)
    if limit:
        b_data = b_data[:IBSIZE - 4 - 10]
    zl_data = zlib.compress(b_data)
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
    print(zl_data[:500])
    return b_data, zl_data
 
 
class TestDeflate(unittest.TestCase):
 
    def testMain(self):
 
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
                            o_oprogress, o_byte, i_waddr, i_raddr, clk, reset):
 
            def tick():
                clk.next = not clk
 
            print("")
            print("==========================")
            print("START TEST MODE", mode)
            print("==========================")
 
            b_data, zl_data = test_data(mode, 25000)
 
            if mode == 0:
                reset.next = 0
                tick()
                yield delay(5)
                reset.next = 1
                tick()
                yield delay(5)
 
            if DECOMPRESS:
                print("==========STREAMING DECOMPRESS TEST=========")
 
                print("STREAM LENGTH", len(zl_data))
 
                i_mode.next = IDLE
                tick()
                yield delay(5)
                tick()
                yield delay(5)
 
                print("STARTD")
                i_mode.next = STARTD
                tick()
                yield delay(5)
                tick()
                yield delay(5)
 
                print("WRITE")
                i = 0
                ri = 0
                first = 1
                sresult = []
                start = now()
                wait = 0
                while True:
                    if ri >= 1000 and ri % 10000 == 0:
                        print(ri)
                    if ri < o_oprogress:
                        did_read = 1
                        # print("do read", ri, o_oprogress)
                        i_mode.next = READ
                        i_raddr.next = ri
                        tick()
                        yield delay(5)
                        tick()
                        yield delay(5)
                        ri = ri + 1
                    else:
                        did_read = 0
 
                    if i < len(zl_data):
                        if o_iprogress > i - MAXW:
                            i_mode.next = WRITE
                            i_waddr.next = i
                            i_data.next = zl_data[i]
                            # print("write", i, zl_data[i])
                            i = i + 1
                        else:
                            # print("Wait for space", i)
                            wait += 1
                    else:
                        i_mode.next = IDLE
 
                    tick()
                    yield delay(5)
                    tick()
                    yield delay(5)
 
                    if did_read:
                        # print("read", ri, o_oprogress, o_byte)
                        sresult.append(bytes([o_byte]))
 
                    if o_done:
                        # print("DONE", o_oprogress, ri)
                        if o_oprogress == ri:
                            break;
 
                i_mode.next = IDLE
                tick()
                yield delay(5)
                tick()
                yield delay(5)
 
                print("IN/OUT/CYCLES/WAIT", len(zl_data), len(sresult),
                      (now() - start) // 10, wait)
                sresult = b''.join(sresult)
                self.assertEqual(b_data, sresult)
                # self.assertEqual(b_data[:100000], sresult[:100000])
                print("Decompress OK!")
 
            print("==========STREAMING COMPRESS TEST=========")
 
            print("STARTC")
            i_mode.next = STARTC
            tick()
            yield delay(5)
            tick()
            yield delay(5)
 
            print("WRITE")
            i = 0
            ri = 0
            slen = 25000
            sresult = []
            wait = 0
            start = now()
            while True:
                if ri < o_oprogress:
                    did_read = 1
                    # print("do read", ri, o_oprogress)
                    i_mode.next = READ
                    i_raddr.next = ri
                    tick()
                    yield delay(5)
                    tick()
                    yield delay(5)
                    if ri % 10000 == 0:
                        print(ri)
                    ri = ri + 1
                else:
                    did_read = 0
 
                if i < slen:
                    if o_iprogress > i - MAXW:
                        i_mode.next = WRITE
                        i_waddr.next = i
                        i_data.next = b_data[i % len(b_data)]
                        # print("write", i, b_data[i % len(b_data)])
                        i = i + 1
                    else:
                        # print("Wait for space", i)
                        wait += 1
                else:
                    i_mode.next = IDLE
 
                tick()
                yield delay(5)
                tick()
                yield delay(5)
 
                if did_read:
                    # print("read", ri, o_oprogress, o_byte)
                    sresult.append(bytes([o_byte]))
 
                if o_done:
                    # print("DONE", o_oprogress, ri)
                    if o_oprogress == ri:
                        break;
 
            i_mode.next = IDLE
 
            print("IN/OUT/CYCLES/WAIT", slen, len(sresult),
                  (now() - start) // 10, wait)
            sresult = b''.join(sresult)
            print("zlib test:", zlib.decompress(sresult)[:60])
            rlen = min(len(b_data), slen)
            self.assertEqual(zlib.decompress(sresult)[:rlen], b_data[:rlen])
            print("DONE!")
 
 
        for loop in range(1):
            for mode in range(5):
            # for mode in range(2,3):
                self.runTests(test_decompress)
 
    def runTests(self, test):
        """Helper method to run the actual tests."""
 
        i_mode = Signal(intbv(0)[3:])
        o_done = Signal(bool(0))
 
        i_data = Signal(intbv()[8:])
        o_byte = Signal(intbv()[8:])
        o_iprogress = Signal(intbv()[LMAX:])
        o_oprogress = Signal(intbv()[LMAX:])
        i_waddr = Signal(modbv()[LMAX:])
        i_raddr = Signal(modbv()[LMAX:])
 
        clk = Signal(bool(0))
        reset = ResetSignal(1, 0, True)
 
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                      o_byte, i_waddr, i_raddr, clk, reset)
 
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                     o_byte, i_waddr, i_raddr, clk, reset)
        sim = Simulation(dut, check)
        # traceSignals(dut)
        sim.run(quiet=1)
 
 
SLOWDOWN = 1
 
@block
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
 
    u_data, c_data = test_data(1, 100, IBSIZE)
 
    CDATA = tuple(c_data)
    UDATA = tuple(u_data)
 
    i_mode = Signal(intbv(0)[3:])
    o_done = Signal(bool(0))
 
    i_data = Signal(intbv()[8:])
    o_byte = Signal(intbv()[8:])
    u_data = Signal(intbv()[8:])
    o_iprogress = Signal(intbv()[LMAX:])
    o_oprogress = Signal(intbv()[LMAX:])
    resultlen = Signal(intbv()[LMAX:])
    i_waddr = Signal(modbv()[LMAX:])
    i_raddr = Signal(modbv()[LBSIZE:])
 
    reset = ResetSignal(1, 0, True)
 
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                  o_byte, i_waddr, i_raddr, i_clk, reset)
 
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
                    'FAIL', 'HALT')
    tstate = Signal(tb_state.RESET)
 
    tbi = Signal(modbv(0)[15:])
    copy = Signal(intbv()[8:])
 
    scounter = Signal(modbv(0)[SLOWDOWN:])
    counter = Signal(modbv(0)[16:])
 
    wtick = Signal(bool(0))
 
    resume = Signal(modbv(0)[6:])
 
    @instance
    def clkgen():
        i_clk.next = 0
        while True:
            yield delay(5)
            i_clk.next = not i_clk
 
    @always(i_clk.posedge)
    def count():
        # o_led.next = counter
        if scounter == 0:
            counter.next = counter + 1
        scounter.next = scounter + 1
 
    @always(i_clk.posedge)
    def logic():
 
        if tstate == tb_state.RESET:
            print("RESET", counter)
            reset.next = 0
            led0_g.next = 0
            led1_b.next = 0
            led2_r.next = 0
            tbi.next = 0
            tstate.next = tb_state.START
 
        elif SLOWDOWN > 2 and scounter != 0:
            pass
 
        elif tstate == tb_state.START:
            # A few cycles reset low
            if tbi < 1:
                tbi.next = tbi.next + 1
            else:
                reset.next = 1
                tstate.next = tb_state.WRITE
                tbi.next = 0
 
        elif tstate == tb_state.HALT:
            led0_g.next = 1
            led2_r.next = 0
            led1_b.next = 0
 
        elif tstate == tb_state.FAIL:
            # Failure: blink all color leds
            led0_g.next = not led0_g
            led2_r.next = not led2_r
            led1_b.next = o_done
 
        elif tstate == tb_state.WRITE:
            if tbi < len(CDATA):
                # print(tbi)
                o_led.next = tbi
                led1_b.next = o_done
                led2_r.next = not led2_r
                i_mode.next = WRITE
                i_data.next = CDATA[tbi]
                i_waddr.next = tbi
                tbi.next = tbi + 1
            else:
                i_mode.next = IDLE
                led2_r.next = 0
                tstate.next = tb_state.DECOMPRESS
 
        elif tstate == tb_state.DECOMPRESS:
            i_mode.next = STARTD
            tstate.next = tb_state.WAIT
 
        elif tstate == tb_state.WAIT:
            led1_b.next = not led1_b
            i_mode.next = IDLE
            if i_mode == IDLE and o_done:
                print("result len", o_oprogress)
                resultlen.next = o_oprogress
                tbi.next = 0
                i_raddr.next = 0
                i_mode.next = READ
                wtick.next = True
                tstate.next = tb_state.VERIFY
 
        elif tstate == tb_state.VERIFY:
            # print("VERIFY", o_data)
            led1_b.next = 0
            o_led.next = tbi
            """
            Note that the read can also be pipelined in a tight loop
            without the WTICK delay, but this will not work with
            SLOWDOWN > 1
            """
            if wtick:
                wtick.next = False
            elif tbi < len(UDATA):
                led2_r.next = not led2_r
                ud1= UDATA[tbi]
                # print(o_byte, ud1)
                if o_byte != ud1:
                    i_mode.next = IDLE
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
                    # resume.next = 1
                    # tstate.next = tb_state.PAUSE
                    tstate.next = tb_state.FAIL
                    # tstate.next = tb_state.RESET
                    raise Error("bad result")
                else:
                    pass
                    # print(tbi, o_data)
                i_raddr.next = tbi + 1
                tbi.next = tbi + 1
                wtick.next = True
            else:
                print(len(UDATA))
                print("DECOMPRESS test OK!, pausing", tbi)
                i_mode.next = IDLE
                tbi.next = 0
                if not COMPRESS:
                    tstate.next = tb_state.CPAUSE
                else:
                    tstate.next = tb_state.PAUSE
                # tstate.next = tb_state.HALT
                # state.next = tb_state.CPAUSE
                resume.next = 1
                # tstate.next = tb_state.CWRITE
 
        elif tstate == tb_state.PAUSE:
            led2_r.next = 0
            if resume == 0:
                print("--------------COMPRESS-------------")
                tbi.next = 0
                led0_g.next = 0
                tstate.next = tb_state.CWRITE
                # tstate.next = tb_state.RESET
            else:
                led2_r.next = not led2_r
                resume.next = resume + 1
 
        #####################################
        # COMPRESS TEST
        #####################################
 
        elif tstate == tb_state.CWRITE:
            o_led.next = tbi
            if tbi < len(UDATA):
                # print(tbi)
                led2_r.next = 0
                led1_b.next = not led1_b
                i_mode.next = WRITE
                i_data.next = UDATA[tbi]
                i_waddr.next = tbi
                tbi.next = tbi + 1
            else:
                print("wrote bytes to compress", tbi)
                i_mode.next = IDLE
                tstate.next = tb_state.COMPRESS
 
        elif tstate == tb_state.COMPRESS:
            i_mode.next = STARTC
            tstate.next = tb_state.CWAIT
 
        elif tstate == tb_state.CWAIT:
            led2_r.next = not led2_r
            if i_mode == STARTC:
                print("WAIT COMPRESS")
                i_mode.next = IDLE
                led1_b.next = 0
            elif o_done:
                print("result len", o_oprogress)
                resultlen.next = o_oprogress
                tbi.next = 0
                i_raddr.next = 0
                i_mode.next = READ
                tstate.next = tb_state.CRESULT
                wtick.next = True
 
        # verify compression
        elif tstate == tb_state.CRESULT:
            # print("COPY COMPRESS RESULT", tbi, o_data)
            led2_r.next = 0
            o_led.next = tbi
            if wtick:
                if tbi > 0:
                    i_mode.next = WRITE
                    i_data.next = copy
                    i_waddr.next = tbi - 1
                wtick.next = False
                tbi.next = tbi + 1
            elif tbi < resultlen:
                i_mode.next = READ
                led1_b.next = not led1_b
                i_raddr.next = tbi
                copy.next = o_byte
                wtick.next = True
            else:
                print("Compress output bytes copied to input", resultlen, tbi - 1)
                i_mode.next = IDLE
                tbi.next = 0
                tstate.next = tb_state.VDECOMPRESS
 
        elif tstate == tb_state.VDECOMPRESS:
            print("start decompress of test compression")
            i_mode.next = STARTD
            tstate.next = tb_state.VWAIT
 
        elif tstate == tb_state.VWAIT:
            led2_r.next = 0
            led1_b.next = not led1_b
            i_mode.next = IDLE
            if i_mode == IDLE and o_done:
                print("DONE DECOMPRESS VERIFY", o_oprogress)
                tbi.next = 0
                i_raddr.next = 0
                i_mode.next = READ
                wtick.next = True
                tstate.next = tb_state.CVERIFY
 
        elif tstate == tb_state.CVERIFY:
            # print("COMPRESS VERIFY", tbi, o_byte)
            led1_b.next = 0
            led2_r.next = not led2_r
            o_led.next = tbi
            if wtick:
                wtick.next = False
            elif tbi < len(UDATA):
                ud2 = UDATA[tbi]
                # print(tbi, o_byte, ud2)
                if o_byte != ud2:
                    tstate.next = tb_state.RESET
                    i_mode.next = IDLE
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
                    raise Error("bad result")
                    tstate.next = tb_state.FAIL
                tbi.next = tbi + 1
                i_raddr.next = tbi + 1
                wtick.next = True
            else:
                print(len(UDATA))
                print("ALL OK!", tbi)
                led2_r.next = 0
                i_mode.next = IDLE
                resume.next = 1
                """
                """
                tstate.next = tb_state.CPAUSE
                # tstate.next = tb_state.HALT
 
        elif tstate == tb_state.CPAUSE:
            if SLOWDOWN <= 4:
                raise StopSimulation()
            if resume == 0:
                print("--------------RESET-------------")
                o_led.next = o_led + 1
                tstate.next = tb_state.RESET
            else:
                led0_g.next = not led0_g
                resume.next = resume + 1
 
        """
        if now() > 50000:
            raise StopSimulation()
        """
 
    if SLOWDOWN == 1:
        return clkgen, dut, count, logic
    else:
        return dut, count, logic
 
 
if 1: # not COSIMULATION:
    SLOWDOWN = 22
 
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
 
    tb.convert(initial_values=False)
 
if 1:
    SLOWDOWN = 1
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
    print("convert SLOWDOWN: ", SLOWDOWN)
    tb.convert(name="test_fast_bench", initial_values=True)
    """
    os.system("iverilog -o test_deflate " +
              "test_fast_bench.v dump.v; " +
              "vvp test_deflate")
              """
if 1:
    print("Start Unit test")
    unittest.main(verbosity=2)
 

Compare with Previous | Blame | View Log

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.