OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Rev 3

Go to most recent revision | Compare with Previous | Blame | View Log

import unittest
import os
import zlib
import random
 
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
                  Cosimulation, block, instance, StopSimulation, modbv, \
                  always, always_seq, always_comb, enum, Error
 
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, CWINDOW
 
MAXW = 2 * CWINDOW
 
COSIMULATION = True
COSIMULATION = False
 
if not COSIMULATION:
    from deflate import deflate
else:
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                o_byte, i_addr, clk, reset):
        print("Cosimulation")
        cmd = "iverilog -o deflate " + \
              "deflate.v " + \
              "tb_deflate.v "  # "dump.v "
        os.system(cmd)
        return Cosimulation("vvp -m ./myhdl deflate",
                            i_mode=i_mode, o_done=o_done,
                            i_data=i_data, o_iprogress=o_iprogress,
                            o_oprogress=o_oprogress,
                            o_byte=o_byte, i_addr=i_addr,
                            clk=clk, reset=reset)
 
 
def test_data(m):
    print("MODE", m)
    if m == 0:
        str_data = " ".join(["Hello World! " + str(1) + " "
                             for i in range(100)])
        b_data = str_data.encode('utf-8')
    elif m == 1:
        str_data = " ".join(["   Hello World! " + str(i) + "     "
        # str_data = " ".join(["Hello World! " + str(i) + " "
                             for i in range(5)])
        b_data = str_data.encode('utf-8')
    elif m == 2:
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
                             for i in range(100)])
        b_data = str_data.encode('utf-8')
    elif m == 3:
        b_data = bytes([random.randrange(0,0x100) for i in range(100)])
    else:
        raise Error("unknown test mode")
    b_data = b_data[:IBSIZE-4]
    zl_data = zlib.compress(b_data)
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
    print(zl_data)
    return b_data, zl_data
 
 
class TestDeflate(unittest.TestCase):
 
    def testMain(self):
 
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
                            o_oprogress, o_byte, i_addr, clk, reset):
 
            def tick():
                clk.next = not clk
 
            print("")
            print("==========================")
            print("START TEST MODE", mode)
            print("==========================")
 
            b_data, zl_data = test_data(mode)
 
            reset.next = 0
            tick()
            yield delay(5)
            reset.next = 1
            tick()
            yield delay(5)
 
            print("STARTD")
            i_mode.next = STARTD
            tick()
            yield delay(5)
            tick()
            yield delay(5)
            i_mode.next = IDLE
 
            print("WRITE")
            i_mode.next = WRITE
            i = 0
            while i < len(zl_data):
                if o_iprogress > i - MAXW:
                    # print("write", i)
                    i_data.next = zl_data[i]
                    i_addr.next = i
                    i = i + 1
                else:
                    print("Wait for space")
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            i_mode.next = IDLE
            print("Wrote input, wait for end of decompression")
 
            # alternative without sliding window:
            """
            print("WRITE")
            i_mode.next = WRITE
            for i in range(len(zl_data)):
                i_data.next = zl_data[i]
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            i_mode.next = IDLE
 
            print("STARTD")
            i_mode.next = STARTD
            tick()
            yield delay(5)
            tick()
            yield delay(5)
            i_mode.next = IDLE
            """
 
            print(now())
            while not o_done:
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            print(now())
 
            last = o_oprogress
            print("GOT", last)
            i_mode.next = READ
            d_data = []
            for i in range(last):
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
                d_data.append(bytes([o_byte]))
            i_mode.next = IDLE
 
            d_data = b''.join(d_data)
 
            self.assertEqual(b_data, d_data, "decompress does NOT match")
            print(len(d_data), len(zl_data))
 
            print("==========COMPRESS TEST=========")
 
            i_mode.next = WRITE
            for i in range(len(b_data)):
                i_data.next = b_data[i]
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            i_mode.next = IDLE
 
            i_mode.next = STARTC
            tick()
            yield delay(5)
            tick()
            yield delay(5)
            i_mode.next = IDLE
 
            print(now())
            while not o_done:
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            print(now())
 
            # raise Error("STOP")
 
            last = o_oprogress
            print("last", last)
            i_mode.next = READ
            c_data = []
            for i in range(last):
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
                c_data.append(bytes([o_byte]))
            i_mode.next = IDLE
 
            print("b_data:", len(b_data), b_data)
            c_data = b''.join(c_data)
            print("c_data:", len(c_data), c_data)
            print("zl_data:", len(zl_data), zl_data)
 
            print("zlib test:", zlib.decompress(c_data))
 
            print("WRITE COMPRESSED RESULT")
            i_mode.next = WRITE
            for i in range(len(c_data)):
                i_data.next = c_data[i]
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            i_mode.next = IDLE
 
            print("STARTD after Compress")
            i_mode.next = STARTD
            tick()
            yield delay(5)
            tick()
            yield delay(5)
            i_mode.next = IDLE
 
            print(now())
            while not o_done:
                tick()
                yield delay(5)
                tick()
                yield delay(5)
            print(now())
 
            last = o_oprogress
            i_mode.next = READ
            d_data = []
            for i in range(last):
                i_addr.next = i
                tick()
                yield delay(5)
                tick()
                yield delay(5)
                d_data.append(bytes([o_byte]))
            i_mode.next = IDLE
 
            d_data = b''.join(d_data)
 
            self.assertEqual(b_data, d_data, "decompress after compress does NOT match")
            print(len(b_data), len(zl_data), len(c_data))
 
        for loop in range(1):
            for mode in range(4):
                self.runTests(test_decompress)
 
    def runTests(self, test):
        """Helper method to run the actual tests."""
 
        i_mode = Signal(intbv(0)[3:])
        o_done = Signal(bool(0))
 
        i_data = Signal(intbv()[8:])
        o_byte = Signal(intbv()[8:])
        o_iprogress = Signal(intbv()[LBSIZE:])
        o_oprogress = Signal(intbv()[LBSIZE:])
        i_addr = Signal(intbv()[LBSIZE:])
 
        clk = Signal(bool(0))
        reset = ResetSignal(1, 0, True)
 
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                      o_byte, i_addr, clk, reset)
 
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                     o_byte, i_addr, clk, reset)
        sim = Simulation(dut, check)
        # traceSignals(dut)
        sim.run(quiet=1)
 
 
SLOWDOWN = 1
 
@block
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
 
    u_data, c_data = test_data(1)
 
    CDATA = tuple(c_data)
    UDATA = tuple(u_data)
 
    i_mode = Signal(intbv(0)[3:])
    o_done = Signal(bool(0))
 
    i_data = Signal(intbv()[8:])
    o_byte = Signal(intbv()[8:])
    u_data = Signal(intbv()[8:])
    o_iprogress = Signal(intbv()[LBSIZE:])
    o_oprogress = Signal(intbv()[LBSIZE:])
    resultlen = Signal(intbv()[LBSIZE:])
    i_addr = Signal(intbv()[LBSIZE:])
 
    reset = ResetSignal(1, 0, True)
 
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
                  o_byte, i_addr, i_clk, reset)
 
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
                    'FAIL', 'HALT')
    tstate = Signal(tb_state.RESET)
 
    tbi = Signal(modbv(0)[15:])
    copy = Signal(intbv()[8:])
 
    scounter = Signal(modbv(0)[SLOWDOWN:])
    counter = Signal(modbv(0)[16:])
 
    wtick = Signal(bool(0))
 
    resume = Signal(modbv(0)[6:])
 
    @instance
    def clkgen():
        i_clk.next = 0
        while True:
            yield delay(5)
            i_clk.next = not i_clk
 
    @always(i_clk.posedge)
    def count():
        # o_led.next = counter
        if scounter == 0:
            counter.next = counter + 1
        scounter.next = scounter + 1
 
    @always(i_clk.posedge)
    def logic():
 
        if tstate == tb_state.RESET:
            print("RESET", counter)
            reset.next = 0
            led0_g.next = 0
            led1_b.next = 0
            led2_r.next = 0
            tbi.next = 0
            tstate.next = tb_state.START
 
        elif SLOWDOWN > 2 and scounter != 0:
            pass
 
        elif tstate == tb_state.START:
            # A few cycles reset low
            if tbi < 1:
                tbi.next = tbi.next + 1
            else:
                reset.next = 1
                tstate.next = tb_state.WRITE
                tbi.next = 0
 
        elif tstate == tb_state.HALT:
            led0_g.next = 1
            led2_r.next = 0
            led1_b.next = 0
 
        elif tstate == tb_state.FAIL:
            # Failure: blink all color leds
            led0_g.next = not led0_g
            led2_r.next = not led2_r
            led1_b.next = o_done
 
        elif tstate == tb_state.WRITE:
            if tbi < len(CDATA):
                # print(tbi)
                o_led.next = tbi
                led1_b.next = o_done
                led2_r.next = not led2_r
                i_mode.next = WRITE
                i_data.next = CDATA[tbi]
                i_addr.next = tbi
                tbi.next = tbi + 1
            else:
                i_mode.next = IDLE
                led2_r.next = 0
                tstate.next = tb_state.DECOMPRESS
 
        elif tstate == tb_state.DECOMPRESS:
            i_mode.next = STARTD
            tstate.next = tb_state.WAIT
 
        elif tstate == tb_state.WAIT:
            led1_b.next = not led1_b
            i_mode.next = IDLE
            if i_mode == IDLE and o_done:
                print("result len", o_oprogress)
                if o_oprogress == 0x4F:
                    tstate.next = tb_state.HALT
                resultlen.next = o_oprogress
                tbi.next = 0
                i_addr.next = 0
                i_mode.next = READ
                wtick.next = True
                tstate.next = tb_state.VERIFY
                """
                if o_oprogress == 0x4F:
                    tstate.next = tb_state.HALT
                else:
                    tstate.next = tb_state.FAIL
                """
 
        elif tstate == tb_state.VERIFY:
            # print("VERIFY", o_data)
            led1_b.next = 0
            o_led.next = tbi
            """
            Note that the read can also be pipelined in a tight loop
            without the WTICK delay, but this will not work with
            SLOWDOWN > 1
            """
            if wtick:
                wtick.next = False
            elif tbi < len(UDATA):
                led2_r.next = not led2_r
                ud1= UDATA[tbi]
                # print(o_byte, ud1)
                if o_byte != ud1:
                    i_mode.next = IDLE
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
                    # resume.next = 1
                    # tstate.next = tb_state.PAUSE
                    tstate.next = tb_state.FAIL
                    # tstate.next = tb_state.RESET
                    raise Error("bad result")
                else:
                    pass
                    # print(tbi, o_data)
                i_addr.next = tbi + 1
                tbi.next = tbi + 1
                wtick.next = True
            else:
                print(len(UDATA))
                print("DECOMPRESS test OK!, pausing", tbi)
                i_mode.next = IDLE
                tbi.next = 0
                tstate.next = tb_state.PAUSE
                # tstate.next = tb_state.HALT
                # state.next = tb_state.CPAUSE
                resume.next = 1
                # tstate.next = tb_state.CWRITE
 
        elif tstate == tb_state.PAUSE:
            led2_r.next = 0
            if resume == 0:
                print("--------------COMPRESS-------------")
                tbi.next = 0
                led0_g.next = 0
                tstate.next = tb_state.CWRITE
                # tstate.next = tb_state.RESET
            else:
                led2_r.next = not led2_r
                resume.next = resume + 1
 
        #####################################
        # COMPRESS TEST
        #####################################
 
        elif tstate == tb_state.CWRITE:
            o_led.next = tbi
            if tbi < len(UDATA):
                # print(tbi)
                led2_r.next = 0
                led1_b.next = not led1_b
                i_mode.next = WRITE
                i_data.next = UDATA[tbi]
                i_addr.next = tbi
                tbi.next = tbi + 1
            else:
                print("wrote bytes to compress", tbi)
                i_mode.next = IDLE
                tstate.next = tb_state.COMPRESS
 
        elif tstate == tb_state.COMPRESS:
            i_mode.next = STARTC
            tstate.next = tb_state.CWAIT
 
        elif tstate == tb_state.CWAIT:
            led2_r.next = not led2_r
            if i_mode == STARTC:
                print("WAIT COMPRESS")
                i_mode.next = IDLE
                led1_b.next = 0
            elif o_done:
                print("result len", o_oprogress)
                resultlen.next = o_oprogress
                tbi.next = 0
                i_addr.next = 0
                i_mode.next = READ
                tstate.next = tb_state.CRESULT
                wtick.next = True
 
        # verify compression
        elif tstate == tb_state.CRESULT:
            # print("COPY COMPRESS RESULT", tbi, o_data)
            led2_r.next = 0
            o_led.next = tbi
            if wtick:
                if tbi > 0:
                    i_mode.next = WRITE
                    i_data.next = copy
                    i_addr.next = tbi - 1
                wtick.next = False
                tbi.next = tbi + 1
            elif tbi < resultlen:
                i_mode.next = READ
                led1_b.next = not led1_b
                i_addr.next = tbi
                copy.next = o_byte
                wtick.next = True
            else:
                print("Compress output bytes copied to input", resultlen, tbi - 1)
                i_mode.next = IDLE
                tbi.next = 0
                tstate.next = tb_state.VDECOMPRESS
 
        elif tstate == tb_state.VDECOMPRESS:
            print("start decompress of test compression")
            i_mode.next = STARTD
            tstate.next = tb_state.VWAIT
 
        elif tstate == tb_state.VWAIT:
            led2_r.next = 0
            led1_b.next = not led1_b
            i_mode.next = IDLE
            if i_mode == IDLE and o_done:
                print("DONE DECOMPRESS VERIFY", o_oprogress)
                tbi.next = 0
                i_addr.next = 0
                i_mode.next = READ
                wtick.next = True
                tstate.next = tb_state.CVERIFY
 
        elif tstate == tb_state.CVERIFY:
            # print("COMPRESS VERIFY", tbi, o_byte)
            led1_b.next = 0
            led2_r.next = not led2_r
            o_led.next = tbi
            if wtick:
                wtick.next = False
            elif tbi < len(UDATA):
                ud2 = UDATA[tbi]
                # print(tbi, o_byte, ud2)
                if o_byte != ud2:
                    tstate.next = tb_state.RESET
                    i_mode.next = IDLE
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
                    raise Error("bad result")
                    tstate.next = tb_state.FAIL
                tbi.next = tbi + 1
                i_addr.next = tbi + 1
                wtick.next = True
            else:
                print(len(UDATA))
                print("ALL OK!", tbi)
                led2_r.next = 0
                i_mode.next = IDLE
                resume.next = 1
                if SLOWDOWN <= 4:
                    raise StopSimulation()
                """
                """
                tstate.next = tb_state.CPAUSE
                # tstate.next = tb_state.HALT
 
        elif tstate == tb_state.CPAUSE:
            if resume == 0:
                print("--------------RESET-------------")
                o_led.next = o_led + 1
                tstate.next = tb_state.RESET
            else:
                led0_g.next = not led0_g
                resume.next = resume + 1
 
        """
        if now() > 50000:
            raise StopSimulation()
        """
 
    if SLOWDOWN == 1:
        return clkgen, dut, count, logic
    else:
        return dut, count, logic
 
 
if 1: # not COSIMULATION:
    SLOWDOWN = 22
 
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
 
    tb.convert(initial_values=False)
 
if 1:
    SLOWDOWN = 1
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
    print("convert SLOWDOWN: ", SLOWDOWN)
    tb.convert(name="test_fast_bench", initial_values=True)
    """
    os.system("iverilog -o test_deflate " +
              "test_fast_bench.v dump.v; " +
              "vvp test_deflate")
              """
if 1:
    print("Start Unit test")
    unittest.main(verbosity=2)
 

Go to most recent revision | Compare with Previous | Blame | View Log

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.