OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Blame information for rev 6

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 2 tomtor
import unittest
2
import os
3
import zlib
4
import random
5
 
6
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
7
                  Cosimulation, block, instance, StopSimulation, modbv, \
8
                  always, always_seq, always_comb, enum, Error
9
 
10 5 tomtor
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, \
11 6 tomtor
                    CWINDOW, COMPRESS, DECOMPRESS, OBSIZE, LMAX, LIBSIZE
12 2 tomtor
 
13 6 tomtor
MAXW = CWINDOW
14 2 tomtor
 
15
COSIMULATION = True
16
COSIMULATION = False
17
 
18
if not COSIMULATION:
19
    from deflate import deflate
20
else:
21
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
22 6 tomtor
                o_byte, i_waddr, i_raddr, clk, reset):
23 2 tomtor
        print("Cosimulation")
24
        cmd = "iverilog -o deflate " + \
25
              "deflate.v " + \
26
              "tb_deflate.v "  # "dump.v "
27
        os.system(cmd)
28
        return Cosimulation("vvp -m ./myhdl deflate",
29
                            i_mode=i_mode, o_done=o_done,
30
                            i_data=i_data, o_iprogress=o_iprogress,
31
                            o_oprogress=o_oprogress,
32 6 tomtor
                            o_byte=o_byte, i_waddr=i_waddr, i_raddr=i_raddr,
33 2 tomtor
                            clk=clk, reset=reset)
34
 
35
 
36 6 tomtor
def test_data(m, tlen=100, limit=False):
37
    print("MODE", m, tlen)
38 2 tomtor
    if m == 0:
39
        str_data = " ".join(["Hello World! " + str(1) + " "
40 6 tomtor
                             for i in range(tlen)])
41 2 tomtor
        b_data = str_data.encode('utf-8')
42
    elif m == 1:
43
        str_data = " ".join(["   Hello World! " + str(i) + "     "
44 6 tomtor
                             for i in range(tlen)])
45 2 tomtor
        b_data = str_data.encode('utf-8')
46
    elif m == 2:
47
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
48 6 tomtor
                             for i in range(tlen)])
49 2 tomtor
        b_data = str_data.encode('utf-8')
50
    elif m == 3:
51 6 tomtor
        b_data = bytes([random.randrange(0,0x100) for i in range(tlen)])
52 4 tomtor
    elif m == 4:
53
        str_data = "".join([str(random.randrange(0,2))
54 6 tomtor
                             for i in range(tlen)])
55 4 tomtor
        b_data = str_data.encode('utf-8')
56 2 tomtor
    else:
57
        raise Error("unknown test mode")
58 5 tomtor
    # print(str_data)
59 6 tomtor
    if limit:
60
        b_data = b_data[:IBSIZE - 4 - 10]
61 2 tomtor
    zl_data = zlib.compress(b_data)
62
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
63 6 tomtor
    print(zl_data[:500])
64 2 tomtor
    return b_data, zl_data
65
 
66
 
67
class TestDeflate(unittest.TestCase):
68
 
69
    def testMain(self):
70
 
71
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
72 6 tomtor
                            o_oprogress, o_byte, i_waddr, i_raddr, clk, reset):
73 2 tomtor
 
74
            def tick():
75
                clk.next = not clk
76
 
77
            print("")
78
            print("==========================")
79
            print("START TEST MODE", mode)
80
            print("==========================")
81
 
82 6 tomtor
            b_data, zl_data = test_data(mode, 25000)
83 2 tomtor
 
84 6 tomtor
            if mode == 0:
85
                reset.next = 0
86 2 tomtor
                tick()
87
                yield delay(5)
88 6 tomtor
                reset.next = 1
89 2 tomtor
                tick()
90
                yield delay(5)
91
 
92 6 tomtor
            if DECOMPRESS:
93
                print("==========STREAMING DECOMPRESS TEST=========")
94 2 tomtor
 
95 6 tomtor
                print("STREAM LENGTH", len(zl_data))
96 2 tomtor
 
97 6 tomtor
                i_mode.next = IDLE
98 2 tomtor
                tick()
99
                yield delay(5)
100
                tick()
101
                yield delay(5)
102
 
103 6 tomtor
                print("STARTD")
104
                i_mode.next = STARTD
105 2 tomtor
                tick()
106
                yield delay(5)
107
                tick()
108
                yield delay(5)
109
 
110 6 tomtor
                print("WRITE")
111
                i = 0
112
                ri = 0
113
                first = 1
114
                sresult = []
115
                start = now()
116
                wait = 0
117
                while True:
118
                    if ri >= 1000 and ri % 10000 == 0:
119
                        print(ri)
120
                    if ri < o_oprogress:
121
                        did_read = 1
122
                        # print("do read", ri, o_oprogress)
123
                        i_mode.next = READ
124
                        i_raddr.next = ri
125
                        tick()
126
                        yield delay(5)
127
                        tick()
128
                        yield delay(5)
129
                        ri = ri + 1
130
                    else:
131
                        did_read = 0
132 2 tomtor
 
133 6 tomtor
                    if i < len(zl_data):
134
                        if o_iprogress > i - MAXW:
135
                            i_mode.next = WRITE
136
                            i_waddr.next = i
137
                            i_data.next = zl_data[i]
138
                            # print("write", i, zl_data[i])
139
                            i = i + 1
140
                        else:
141
                            # print("Wait for space", i)
142
                            wait += 1
143
                    else:
144
                        i_mode.next = IDLE
145 2 tomtor
 
146 6 tomtor
                    tick()
147
                    yield delay(5)
148
                    tick()
149
                    yield delay(5)
150 2 tomtor
 
151 6 tomtor
                    if did_read:
152
                        # print("read", ri, o_oprogress, o_byte)
153
                        sresult.append(bytes([o_byte]))
154
 
155
                    if o_done:
156
                        # print("DONE", o_oprogress, ri)
157
                        if o_oprogress == ri:
158
                            break;
159
 
160
                i_mode.next = IDLE
161 2 tomtor
                tick()
162
                yield delay(5)
163
                tick()
164
                yield delay(5)
165
 
166 6 tomtor
                print("IN/OUT/CYCLES/WAIT", len(zl_data), len(sresult),
167
                      (now() - start) // 10, wait)
168
                sresult = b''.join(sresult)
169
                self.assertEqual(b_data, sresult)
170
                # self.assertEqual(b_data[:100000], sresult[:100000])
171
                print("Decompress OK!")
172
 
173
            print("==========STREAMING COMPRESS TEST=========")
174
 
175
            print("STARTC")
176 2 tomtor
            i_mode.next = STARTC
177
            tick()
178
            yield delay(5)
179
            tick()
180
            yield delay(5)
181
 
182 6 tomtor
            print("WRITE")
183
            i = 0
184
            ri = 0
185
            slen = 25000
186
            sresult = []
187
            wait = 0
188
            start = now()
189
            while True:
190
                if ri < o_oprogress:
191
                    did_read = 1
192
                    # print("do read", ri, o_oprogress)
193
                    i_mode.next = READ
194
                    i_raddr.next = ri
195
                    tick()
196
                    yield delay(5)
197
                    tick()
198
                    yield delay(5)
199
                    if ri % 10000 == 0:
200
                        print(ri)
201
                    ri = ri + 1
202
                else:
203
                    did_read = 0
204 2 tomtor
 
205 6 tomtor
                if i < slen:
206
                    if o_iprogress > i - MAXW:
207
                        i_mode.next = WRITE
208
                        i_waddr.next = i
209
                        i_data.next = b_data[i % len(b_data)]
210
                        # print("write", i, b_data[i % len(b_data)])
211
                        i = i + 1
212
                    else:
213
                        # print("Wait for space", i)
214
                        wait += 1
215
                else:
216
                    i_mode.next = IDLE
217 2 tomtor
 
218
                tick()
219
                yield delay(5)
220
                tick()
221
                yield delay(5)
222
 
223 6 tomtor
                if did_read:
224
                    # print("read", ri, o_oprogress, o_byte)
225
                    sresult.append(bytes([o_byte]))
226 2 tomtor
 
227 6 tomtor
                if o_done:
228
                    # print("DONE", o_oprogress, ri)
229
                    if o_oprogress == ri:
230
                        break;
231 2 tomtor
 
232
            i_mode.next = IDLE
233
 
234 6 tomtor
            print("IN/OUT/CYCLES/WAIT", slen, len(sresult),
235
                  (now() - start) // 10, wait)
236
            sresult = b''.join(sresult)
237
            print("zlib test:", zlib.decompress(sresult)[:60])
238
            rlen = min(len(b_data), slen)
239
            self.assertEqual(zlib.decompress(sresult)[:rlen], b_data[:rlen])
240
            print("DONE!")
241 2 tomtor
 
242
 
243
        for loop in range(1):
244 4 tomtor
            for mode in range(5):
245 6 tomtor
            # for mode in range(2,3):
246 2 tomtor
                self.runTests(test_decompress)
247
 
248
    def runTests(self, test):
249
        """Helper method to run the actual tests."""
250
 
251
        i_mode = Signal(intbv(0)[3:])
252
        o_done = Signal(bool(0))
253
 
254
        i_data = Signal(intbv()[8:])
255
        o_byte = Signal(intbv()[8:])
256 6 tomtor
        o_iprogress = Signal(intbv()[LMAX:])
257
        o_oprogress = Signal(intbv()[LMAX:])
258
        i_waddr = Signal(modbv()[LMAX:])
259
        i_raddr = Signal(modbv()[LMAX:])
260 2 tomtor
 
261
        clk = Signal(bool(0))
262
        reset = ResetSignal(1, 0, True)
263
 
264
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
265 6 tomtor
                      o_byte, i_waddr, i_raddr, clk, reset)
266 2 tomtor
 
267
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
268 6 tomtor
                     o_byte, i_waddr, i_raddr, clk, reset)
269 2 tomtor
        sim = Simulation(dut, check)
270
        # traceSignals(dut)
271
        sim.run(quiet=1)
272
 
273
 
274
SLOWDOWN = 1
275
 
276
@block
277
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
278
 
279 6 tomtor
    u_data, c_data = test_data(1, 100, IBSIZE)
280 2 tomtor
 
281
    CDATA = tuple(c_data)
282
    UDATA = tuple(u_data)
283
 
284
    i_mode = Signal(intbv(0)[3:])
285
    o_done = Signal(bool(0))
286
 
287
    i_data = Signal(intbv()[8:])
288
    o_byte = Signal(intbv()[8:])
289
    u_data = Signal(intbv()[8:])
290 6 tomtor
    o_iprogress = Signal(intbv()[LMAX:])
291
    o_oprogress = Signal(intbv()[LMAX:])
292
    resultlen = Signal(intbv()[LMAX:])
293
    i_waddr = Signal(modbv()[LMAX:])
294
    i_raddr = Signal(modbv()[LBSIZE:])
295 2 tomtor
 
296
    reset = ResetSignal(1, 0, True)
297
 
298
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
299 6 tomtor
                  o_byte, i_waddr, i_raddr, i_clk, reset)
300 2 tomtor
 
301
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
302
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
303
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
304
                    'FAIL', 'HALT')
305
    tstate = Signal(tb_state.RESET)
306
 
307
    tbi = Signal(modbv(0)[15:])
308
    copy = Signal(intbv()[8:])
309
 
310
    scounter = Signal(modbv(0)[SLOWDOWN:])
311
    counter = Signal(modbv(0)[16:])
312
 
313
    wtick = Signal(bool(0))
314
 
315
    resume = Signal(modbv(0)[6:])
316
 
317
    @instance
318
    def clkgen():
319
        i_clk.next = 0
320
        while True:
321
            yield delay(5)
322
            i_clk.next = not i_clk
323
 
324
    @always(i_clk.posedge)
325
    def count():
326
        # o_led.next = counter
327
        if scounter == 0:
328
            counter.next = counter + 1
329
        scounter.next = scounter + 1
330
 
331
    @always(i_clk.posedge)
332
    def logic():
333
 
334
        if tstate == tb_state.RESET:
335
            print("RESET", counter)
336
            reset.next = 0
337
            led0_g.next = 0
338
            led1_b.next = 0
339
            led2_r.next = 0
340
            tbi.next = 0
341
            tstate.next = tb_state.START
342
 
343
        elif SLOWDOWN > 2 and scounter != 0:
344
            pass
345
 
346
        elif tstate == tb_state.START:
347
            # A few cycles reset low
348
            if tbi < 1:
349
                tbi.next = tbi.next + 1
350
            else:
351
                reset.next = 1
352
                tstate.next = tb_state.WRITE
353
                tbi.next = 0
354
 
355
        elif tstate == tb_state.HALT:
356
            led0_g.next = 1
357
            led2_r.next = 0
358
            led1_b.next = 0
359
 
360
        elif tstate == tb_state.FAIL:
361
            # Failure: blink all color leds
362
            led0_g.next = not led0_g
363
            led2_r.next = not led2_r
364
            led1_b.next = o_done
365
 
366
        elif tstate == tb_state.WRITE:
367
            if tbi < len(CDATA):
368
                # print(tbi)
369
                o_led.next = tbi
370
                led1_b.next = o_done
371
                led2_r.next = not led2_r
372
                i_mode.next = WRITE
373
                i_data.next = CDATA[tbi]
374 6 tomtor
                i_waddr.next = tbi
375 2 tomtor
                tbi.next = tbi + 1
376
            else:
377
                i_mode.next = IDLE
378
                led2_r.next = 0
379
                tstate.next = tb_state.DECOMPRESS
380
 
381
        elif tstate == tb_state.DECOMPRESS:
382
            i_mode.next = STARTD
383
            tstate.next = tb_state.WAIT
384
 
385
        elif tstate == tb_state.WAIT:
386
            led1_b.next = not led1_b
387
            i_mode.next = IDLE
388
            if i_mode == IDLE and o_done:
389
                print("result len", o_oprogress)
390
                resultlen.next = o_oprogress
391
                tbi.next = 0
392 6 tomtor
                i_raddr.next = 0
393 2 tomtor
                i_mode.next = READ
394
                wtick.next = True
395
                tstate.next = tb_state.VERIFY
396
 
397
        elif tstate == tb_state.VERIFY:
398
            # print("VERIFY", o_data)
399
            led1_b.next = 0
400
            o_led.next = tbi
401
            """
402
            Note that the read can also be pipelined in a tight loop
403
            without the WTICK delay, but this will not work with
404
            SLOWDOWN > 1
405
            """
406
            if wtick:
407
                wtick.next = False
408
            elif tbi < len(UDATA):
409
                led2_r.next = not led2_r
410
                ud1= UDATA[tbi]
411
                # print(o_byte, ud1)
412
                if o_byte != ud1:
413
                    i_mode.next = IDLE
414
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
415
                    # resume.next = 1
416
                    # tstate.next = tb_state.PAUSE
417
                    tstate.next = tb_state.FAIL
418
                    # tstate.next = tb_state.RESET
419
                    raise Error("bad result")
420
                else:
421
                    pass
422
                    # print(tbi, o_data)
423 6 tomtor
                i_raddr.next = tbi + 1
424 2 tomtor
                tbi.next = tbi + 1
425
                wtick.next = True
426
            else:
427
                print(len(UDATA))
428
                print("DECOMPRESS test OK!, pausing", tbi)
429
                i_mode.next = IDLE
430
                tbi.next = 0
431 5 tomtor
                if not COMPRESS:
432
                    tstate.next = tb_state.CPAUSE
433
                else:
434
                    tstate.next = tb_state.PAUSE
435 2 tomtor
                # tstate.next = tb_state.HALT
436
                # state.next = tb_state.CPAUSE
437
                resume.next = 1
438
                # tstate.next = tb_state.CWRITE
439
 
440
        elif tstate == tb_state.PAUSE:
441
            led2_r.next = 0
442
            if resume == 0:
443
                print("--------------COMPRESS-------------")
444
                tbi.next = 0
445
                led0_g.next = 0
446
                tstate.next = tb_state.CWRITE
447
                # tstate.next = tb_state.RESET
448
            else:
449
                led2_r.next = not led2_r
450
                resume.next = resume + 1
451
 
452
        #####################################
453
        # COMPRESS TEST
454
        #####################################
455
 
456
        elif tstate == tb_state.CWRITE:
457
            o_led.next = tbi
458
            if tbi < len(UDATA):
459
                # print(tbi)
460
                led2_r.next = 0
461
                led1_b.next = not led1_b
462
                i_mode.next = WRITE
463
                i_data.next = UDATA[tbi]
464 6 tomtor
                i_waddr.next = tbi
465 2 tomtor
                tbi.next = tbi + 1
466
            else:
467
                print("wrote bytes to compress", tbi)
468
                i_mode.next = IDLE
469
                tstate.next = tb_state.COMPRESS
470
 
471
        elif tstate == tb_state.COMPRESS:
472
            i_mode.next = STARTC
473
            tstate.next = tb_state.CWAIT
474
 
475
        elif tstate == tb_state.CWAIT:
476
            led2_r.next = not led2_r
477
            if i_mode == STARTC:
478
                print("WAIT COMPRESS")
479
                i_mode.next = IDLE
480
                led1_b.next = 0
481
            elif o_done:
482
                print("result len", o_oprogress)
483
                resultlen.next = o_oprogress
484
                tbi.next = 0
485 6 tomtor
                i_raddr.next = 0
486 2 tomtor
                i_mode.next = READ
487
                tstate.next = tb_state.CRESULT
488
                wtick.next = True
489
 
490
        # verify compression
491
        elif tstate == tb_state.CRESULT:
492
            # print("COPY COMPRESS RESULT", tbi, o_data)
493
            led2_r.next = 0
494
            o_led.next = tbi
495
            if wtick:
496
                if tbi > 0:
497
                    i_mode.next = WRITE
498
                    i_data.next = copy
499 6 tomtor
                    i_waddr.next = tbi - 1
500 2 tomtor
                wtick.next = False
501
                tbi.next = tbi + 1
502
            elif tbi < resultlen:
503
                i_mode.next = READ
504
                led1_b.next = not led1_b
505 6 tomtor
                i_raddr.next = tbi
506 2 tomtor
                copy.next = o_byte
507
                wtick.next = True
508
            else:
509
                print("Compress output bytes copied to input", resultlen, tbi - 1)
510
                i_mode.next = IDLE
511
                tbi.next = 0
512
                tstate.next = tb_state.VDECOMPRESS
513
 
514
        elif tstate == tb_state.VDECOMPRESS:
515
            print("start decompress of test compression")
516
            i_mode.next = STARTD
517
            tstate.next = tb_state.VWAIT
518
 
519
        elif tstate == tb_state.VWAIT:
520
            led2_r.next = 0
521
            led1_b.next = not led1_b
522
            i_mode.next = IDLE
523
            if i_mode == IDLE and o_done:
524
                print("DONE DECOMPRESS VERIFY", o_oprogress)
525
                tbi.next = 0
526 6 tomtor
                i_raddr.next = 0
527 2 tomtor
                i_mode.next = READ
528
                wtick.next = True
529
                tstate.next = tb_state.CVERIFY
530
 
531
        elif tstate == tb_state.CVERIFY:
532
            # print("COMPRESS VERIFY", tbi, o_byte)
533
            led1_b.next = 0
534
            led2_r.next = not led2_r
535
            o_led.next = tbi
536
            if wtick:
537
                wtick.next = False
538
            elif tbi < len(UDATA):
539
                ud2 = UDATA[tbi]
540
                # print(tbi, o_byte, ud2)
541
                if o_byte != ud2:
542
                    tstate.next = tb_state.RESET
543
                    i_mode.next = IDLE
544
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
545
                    raise Error("bad result")
546
                    tstate.next = tb_state.FAIL
547
                tbi.next = tbi + 1
548 6 tomtor
                i_raddr.next = tbi + 1
549 2 tomtor
                wtick.next = True
550
            else:
551
                print(len(UDATA))
552
                print("ALL OK!", tbi)
553
                led2_r.next = 0
554
                i_mode.next = IDLE
555
                resume.next = 1
556
                """
557
                """
558
                tstate.next = tb_state.CPAUSE
559
                # tstate.next = tb_state.HALT
560
 
561
        elif tstate == tb_state.CPAUSE:
562 5 tomtor
            if SLOWDOWN <= 4:
563
                raise StopSimulation()
564 2 tomtor
            if resume == 0:
565
                print("--------------RESET-------------")
566
                o_led.next = o_led + 1
567
                tstate.next = tb_state.RESET
568
            else:
569
                led0_g.next = not led0_g
570
                resume.next = resume + 1
571
 
572
        """
573
        if now() > 50000:
574
            raise StopSimulation()
575
        """
576
 
577
    if SLOWDOWN == 1:
578
        return clkgen, dut, count, logic
579
    else:
580
        return dut, count, logic
581
 
582
 
583
if 1: # not COSIMULATION:
584
    SLOWDOWN = 22
585
 
586
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
587
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
588
 
589
    tb.convert(initial_values=False)
590
 
591
if 1:
592
    SLOWDOWN = 1
593
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
594
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
595
    print("convert SLOWDOWN: ", SLOWDOWN)
596
    tb.convert(name="test_fast_bench", initial_values=True)
597
    """
598
    os.system("iverilog -o test_deflate " +
599
              "test_fast_bench.v dump.v; " +
600
              "vvp test_deflate")
601
              """
602
if 1:
603
    print("Start Unit test")
604
    unittest.main(verbosity=2)

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.