OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Blame information for rev 5

Go to most recent revision | Details | Compare with Previous | View Log

Line No. Rev Author Line
1 2 tomtor
import unittest
2
import os
3
import zlib
4
import random
5
 
6
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
7
                  Cosimulation, block, instance, StopSimulation, modbv, \
8
                  always, always_seq, always_comb, enum, Error
9
 
10 5 tomtor
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, \
11
                    CWINDOW, COMPRESS
12 2 tomtor
 
13
MAXW = 2 * CWINDOW
14
 
15
COSIMULATION = True
16
COSIMULATION = False
17
 
18
if not COSIMULATION:
19
    from deflate import deflate
20
else:
21
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
22
                o_byte, i_addr, clk, reset):
23
        print("Cosimulation")
24
        cmd = "iverilog -o deflate " + \
25
              "deflate.v " + \
26
              "tb_deflate.v "  # "dump.v "
27
        os.system(cmd)
28
        return Cosimulation("vvp -m ./myhdl deflate",
29
                            i_mode=i_mode, o_done=o_done,
30
                            i_data=i_data, o_iprogress=o_iprogress,
31
                            o_oprogress=o_oprogress,
32
                            o_byte=o_byte, i_addr=i_addr,
33
                            clk=clk, reset=reset)
34
 
35
 
36
def test_data(m):
37
    print("MODE", m)
38
    if m == 0:
39
        str_data = " ".join(["Hello World! " + str(1) + " "
40
                             for i in range(100)])
41
        b_data = str_data.encode('utf-8')
42
    elif m == 1:
43
        str_data = " ".join(["   Hello World! " + str(i) + "     "
44
        # str_data = " ".join(["Hello World! " + str(i) + " "
45
                             for i in range(5)])
46
        b_data = str_data.encode('utf-8')
47
    elif m == 2:
48
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
49
                             for i in range(100)])
50
        b_data = str_data.encode('utf-8')
51
    elif m == 3:
52
        b_data = bytes([random.randrange(0,0x100) for i in range(100)])
53 4 tomtor
    elif m == 4:
54
        str_data = "".join([str(random.randrange(0,2))
55
                             for i in range(1000)])
56
        b_data = str_data.encode('utf-8')
57 2 tomtor
    else:
58
        raise Error("unknown test mode")
59 5 tomtor
    # print(str_data)
60
    b_data = b_data[:IBSIZE - 4 - 20]
61 2 tomtor
    zl_data = zlib.compress(b_data)
62
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
63
    print(zl_data)
64
    return b_data, zl_data
65
 
66
 
67
class TestDeflate(unittest.TestCase):
68
 
69
    def testMain(self):
70
 
71
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
72
                            o_oprogress, o_byte, i_addr, clk, reset):
73
 
74
            def tick():
75
                clk.next = not clk
76
 
77
            print("")
78
            print("==========================")
79
            print("START TEST MODE", mode)
80
            print("==========================")
81
 
82
            b_data, zl_data = test_data(mode)
83
 
84
            reset.next = 0
85
            tick()
86
            yield delay(5)
87
            reset.next = 1
88
            tick()
89
            yield delay(5)
90
 
91
            print("STARTD")
92
            i_mode.next = STARTD
93
            tick()
94
            yield delay(5)
95
            tick()
96
            yield delay(5)
97
            i_mode.next = IDLE
98
 
99
            print("WRITE")
100
            i_mode.next = WRITE
101
            i = 0
102
            while i < len(zl_data):
103
                if o_iprogress > i - MAXW:
104
                    # print("write", i)
105
                    i_data.next = zl_data[i]
106
                    i_addr.next = i
107
                    i = i + 1
108
                else:
109 5 tomtor
                    # print("Wait for space")
110
                    pass
111 2 tomtor
                tick()
112
                yield delay(5)
113
                tick()
114
                yield delay(5)
115
            i_mode.next = IDLE
116
            print("Wrote input, wait for end of decompression")
117
 
118
            # alternative without sliding window:
119
            """
120
            print("WRITE")
121
            i_mode.next = WRITE
122
            for i in range(len(zl_data)):
123
                i_data.next = zl_data[i]
124
                i_addr.next = i
125
                tick()
126
                yield delay(5)
127
                tick()
128
                yield delay(5)
129
            i_mode.next = IDLE
130
 
131
            print("STARTD")
132
            i_mode.next = STARTD
133
            tick()
134
            yield delay(5)
135
            tick()
136
            yield delay(5)
137
            i_mode.next = IDLE
138
            """
139
 
140
            print(now())
141
            while not o_done:
142
                tick()
143
                yield delay(5)
144
                tick()
145
                yield delay(5)
146
            print(now())
147
 
148
            last = o_oprogress
149
            print("GOT", last)
150
            i_mode.next = READ
151
            d_data = []
152
            for i in range(last):
153
                i_addr.next = i
154
                tick()
155
                yield delay(5)
156
                tick()
157
                yield delay(5)
158
                d_data.append(bytes([o_byte]))
159
            i_mode.next = IDLE
160
 
161
            d_data = b''.join(d_data)
162
 
163
            self.assertEqual(b_data, d_data, "decompress does NOT match")
164
            print(len(d_data), len(zl_data))
165
 
166
            print("==========COMPRESS TEST=========")
167
 
168
            i_mode.next = WRITE
169
            for i in range(len(b_data)):
170
                i_data.next = b_data[i]
171
                i_addr.next = i
172
                tick()
173
                yield delay(5)
174
                tick()
175
                yield delay(5)
176
            i_mode.next = IDLE
177
 
178
            i_mode.next = STARTC
179
            tick()
180
            yield delay(5)
181
            tick()
182
            yield delay(5)
183
            i_mode.next = IDLE
184
 
185
            print(now())
186
            while not o_done:
187
                tick()
188
                yield delay(5)
189
                tick()
190
                yield delay(5)
191
            print(now())
192
 
193
            # raise Error("STOP")
194
 
195
            last = o_oprogress
196
            print("last", last)
197
            i_mode.next = READ
198
            c_data = []
199
            for i in range(last):
200
                i_addr.next = i
201
                tick()
202
                yield delay(5)
203
                tick()
204
                yield delay(5)
205
                c_data.append(bytes([o_byte]))
206
            i_mode.next = IDLE
207
 
208
            print("b_data:", len(b_data), b_data)
209
            c_data = b''.join(c_data)
210
            print("c_data:", len(c_data), c_data)
211
            print("zl_data:", len(zl_data), zl_data)
212
 
213
            print("zlib test:", zlib.decompress(c_data))
214
 
215
            print("WRITE COMPRESSED RESULT")
216
            i_mode.next = WRITE
217
            for i in range(len(c_data)):
218
                i_data.next = c_data[i]
219
                i_addr.next = i
220
                tick()
221
                yield delay(5)
222
                tick()
223
                yield delay(5)
224
            i_mode.next = IDLE
225
 
226
            print("STARTD after Compress")
227
            i_mode.next = STARTD
228
            tick()
229
            yield delay(5)
230
            tick()
231
            yield delay(5)
232
            i_mode.next = IDLE
233
 
234
            print(now())
235
            while not o_done:
236
                tick()
237
                yield delay(5)
238
                tick()
239
                yield delay(5)
240
            print(now())
241
 
242
            last = o_oprogress
243
            i_mode.next = READ
244
            d_data = []
245
            for i in range(last):
246
                i_addr.next = i
247
                tick()
248
                yield delay(5)
249
                tick()
250
                yield delay(5)
251
                d_data.append(bytes([o_byte]))
252
            i_mode.next = IDLE
253
 
254
            d_data = b''.join(d_data)
255
 
256
            self.assertEqual(b_data, d_data, "decompress after compress does NOT match")
257
            print(len(b_data), len(zl_data), len(c_data))
258
 
259
        for loop in range(1):
260 4 tomtor
            for mode in range(5):
261 2 tomtor
                self.runTests(test_decompress)
262
 
263
    def runTests(self, test):
264
        """Helper method to run the actual tests."""
265
 
266
        i_mode = Signal(intbv(0)[3:])
267
        o_done = Signal(bool(0))
268
 
269
        i_data = Signal(intbv()[8:])
270
        o_byte = Signal(intbv()[8:])
271
        o_iprogress = Signal(intbv()[LBSIZE:])
272
        o_oprogress = Signal(intbv()[LBSIZE:])
273
        i_addr = Signal(intbv()[LBSIZE:])
274
 
275
        clk = Signal(bool(0))
276
        reset = ResetSignal(1, 0, True)
277
 
278
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
279
                      o_byte, i_addr, clk, reset)
280
 
281
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
282
                     o_byte, i_addr, clk, reset)
283
        sim = Simulation(dut, check)
284
        # traceSignals(dut)
285
        sim.run(quiet=1)
286
 
287
 
288
SLOWDOWN = 1
289
 
290
@block
291
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
292
 
293
    u_data, c_data = test_data(1)
294
 
295
    CDATA = tuple(c_data)
296
    UDATA = tuple(u_data)
297
 
298
    i_mode = Signal(intbv(0)[3:])
299
    o_done = Signal(bool(0))
300
 
301
    i_data = Signal(intbv()[8:])
302
    o_byte = Signal(intbv()[8:])
303
    u_data = Signal(intbv()[8:])
304
    o_iprogress = Signal(intbv()[LBSIZE:])
305
    o_oprogress = Signal(intbv()[LBSIZE:])
306
    resultlen = Signal(intbv()[LBSIZE:])
307
    i_addr = Signal(intbv()[LBSIZE:])
308
 
309
    reset = ResetSignal(1, 0, True)
310
 
311
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
312
                  o_byte, i_addr, i_clk, reset)
313
 
314
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
315
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
316
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
317
                    'FAIL', 'HALT')
318
    tstate = Signal(tb_state.RESET)
319
 
320
    tbi = Signal(modbv(0)[15:])
321
    copy = Signal(intbv()[8:])
322
 
323
    scounter = Signal(modbv(0)[SLOWDOWN:])
324
    counter = Signal(modbv(0)[16:])
325
 
326
    wtick = Signal(bool(0))
327
 
328
    resume = Signal(modbv(0)[6:])
329
 
330
    @instance
331
    def clkgen():
332
        i_clk.next = 0
333
        while True:
334
            yield delay(5)
335
            i_clk.next = not i_clk
336
 
337
    @always(i_clk.posedge)
338
    def count():
339
        # o_led.next = counter
340
        if scounter == 0:
341
            counter.next = counter + 1
342
        scounter.next = scounter + 1
343
 
344
    @always(i_clk.posedge)
345
    def logic():
346
 
347
        if tstate == tb_state.RESET:
348
            print("RESET", counter)
349
            reset.next = 0
350
            led0_g.next = 0
351
            led1_b.next = 0
352
            led2_r.next = 0
353
            tbi.next = 0
354
            tstate.next = tb_state.START
355
 
356
        elif SLOWDOWN > 2 and scounter != 0:
357
            pass
358
 
359
        elif tstate == tb_state.START:
360
            # A few cycles reset low
361
            if tbi < 1:
362
                tbi.next = tbi.next + 1
363
            else:
364
                reset.next = 1
365
                tstate.next = tb_state.WRITE
366
                tbi.next = 0
367
 
368
        elif tstate == tb_state.HALT:
369
            led0_g.next = 1
370
            led2_r.next = 0
371
            led1_b.next = 0
372
 
373
        elif tstate == tb_state.FAIL:
374
            # Failure: blink all color leds
375
            led0_g.next = not led0_g
376
            led2_r.next = not led2_r
377
            led1_b.next = o_done
378
 
379
        elif tstate == tb_state.WRITE:
380
            if tbi < len(CDATA):
381
                # print(tbi)
382
                o_led.next = tbi
383
                led1_b.next = o_done
384
                led2_r.next = not led2_r
385
                i_mode.next = WRITE
386
                i_data.next = CDATA[tbi]
387
                i_addr.next = tbi
388
                tbi.next = tbi + 1
389
            else:
390
                i_mode.next = IDLE
391
                led2_r.next = 0
392
                tstate.next = tb_state.DECOMPRESS
393
 
394
        elif tstate == tb_state.DECOMPRESS:
395
            i_mode.next = STARTD
396
            tstate.next = tb_state.WAIT
397
 
398
        elif tstate == tb_state.WAIT:
399
            led1_b.next = not led1_b
400
            i_mode.next = IDLE
401
            if i_mode == IDLE and o_done:
402
                print("result len", o_oprogress)
403
                if o_oprogress == 0x4F:
404
                    tstate.next = tb_state.HALT
405
                resultlen.next = o_oprogress
406
                tbi.next = 0
407
                i_addr.next = 0
408
                i_mode.next = READ
409
                wtick.next = True
410
                tstate.next = tb_state.VERIFY
411
                """
412
                if o_oprogress == 0x4F:
413
                    tstate.next = tb_state.HALT
414
                else:
415
                    tstate.next = tb_state.FAIL
416
                """
417
 
418
        elif tstate == tb_state.VERIFY:
419
            # print("VERIFY", o_data)
420
            led1_b.next = 0
421
            o_led.next = tbi
422
            """
423
            Note that the read can also be pipelined in a tight loop
424
            without the WTICK delay, but this will not work with
425
            SLOWDOWN > 1
426
            """
427
            if wtick:
428
                wtick.next = False
429
            elif tbi < len(UDATA):
430
                led2_r.next = not led2_r
431
                ud1= UDATA[tbi]
432
                # print(o_byte, ud1)
433
                if o_byte != ud1:
434
                    i_mode.next = IDLE
435
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
436
                    # resume.next = 1
437
                    # tstate.next = tb_state.PAUSE
438
                    tstate.next = tb_state.FAIL
439
                    # tstate.next = tb_state.RESET
440
                    raise Error("bad result")
441
                else:
442
                    pass
443
                    # print(tbi, o_data)
444
                i_addr.next = tbi + 1
445
                tbi.next = tbi + 1
446
                wtick.next = True
447
            else:
448
                print(len(UDATA))
449
                print("DECOMPRESS test OK!, pausing", tbi)
450
                i_mode.next = IDLE
451
                tbi.next = 0
452 5 tomtor
                if not COMPRESS:
453
                    tstate.next = tb_state.CPAUSE
454
                else:
455
                    tstate.next = tb_state.PAUSE
456 2 tomtor
                # tstate.next = tb_state.HALT
457
                # state.next = tb_state.CPAUSE
458
                resume.next = 1
459
                # tstate.next = tb_state.CWRITE
460
 
461
        elif tstate == tb_state.PAUSE:
462
            led2_r.next = 0
463
            if resume == 0:
464
                print("--------------COMPRESS-------------")
465
                tbi.next = 0
466
                led0_g.next = 0
467
                tstate.next = tb_state.CWRITE
468
                # tstate.next = tb_state.RESET
469
            else:
470
                led2_r.next = not led2_r
471
                resume.next = resume + 1
472
 
473
        #####################################
474
        # COMPRESS TEST
475
        #####################################
476
 
477
        elif tstate == tb_state.CWRITE:
478
            o_led.next = tbi
479
            if tbi < len(UDATA):
480
                # print(tbi)
481
                led2_r.next = 0
482
                led1_b.next = not led1_b
483
                i_mode.next = WRITE
484
                i_data.next = UDATA[tbi]
485
                i_addr.next = tbi
486
                tbi.next = tbi + 1
487
            else:
488
                print("wrote bytes to compress", tbi)
489
                i_mode.next = IDLE
490
                tstate.next = tb_state.COMPRESS
491
 
492
        elif tstate == tb_state.COMPRESS:
493
            i_mode.next = STARTC
494
            tstate.next = tb_state.CWAIT
495
 
496
        elif tstate == tb_state.CWAIT:
497
            led2_r.next = not led2_r
498
            if i_mode == STARTC:
499
                print("WAIT COMPRESS")
500
                i_mode.next = IDLE
501
                led1_b.next = 0
502
            elif o_done:
503
                print("result len", o_oprogress)
504
                resultlen.next = o_oprogress
505
                tbi.next = 0
506
                i_addr.next = 0
507
                i_mode.next = READ
508
                tstate.next = tb_state.CRESULT
509
                wtick.next = True
510
 
511
        # verify compression
512
        elif tstate == tb_state.CRESULT:
513
            # print("COPY COMPRESS RESULT", tbi, o_data)
514
            led2_r.next = 0
515
            o_led.next = tbi
516
            if wtick:
517
                if tbi > 0:
518
                    i_mode.next = WRITE
519
                    i_data.next = copy
520
                    i_addr.next = tbi - 1
521
                wtick.next = False
522
                tbi.next = tbi + 1
523
            elif tbi < resultlen:
524
                i_mode.next = READ
525
                led1_b.next = not led1_b
526
                i_addr.next = tbi
527
                copy.next = o_byte
528
                wtick.next = True
529
            else:
530
                print("Compress output bytes copied to input", resultlen, tbi - 1)
531
                i_mode.next = IDLE
532
                tbi.next = 0
533
                tstate.next = tb_state.VDECOMPRESS
534
 
535
        elif tstate == tb_state.VDECOMPRESS:
536
            print("start decompress of test compression")
537
            i_mode.next = STARTD
538
            tstate.next = tb_state.VWAIT
539
 
540
        elif tstate == tb_state.VWAIT:
541
            led2_r.next = 0
542
            led1_b.next = not led1_b
543
            i_mode.next = IDLE
544
            if i_mode == IDLE and o_done:
545
                print("DONE DECOMPRESS VERIFY", o_oprogress)
546
                tbi.next = 0
547
                i_addr.next = 0
548
                i_mode.next = READ
549
                wtick.next = True
550
                tstate.next = tb_state.CVERIFY
551
 
552
        elif tstate == tb_state.CVERIFY:
553
            # print("COMPRESS VERIFY", tbi, o_byte)
554
            led1_b.next = 0
555
            led2_r.next = not led2_r
556
            o_led.next = tbi
557
            if wtick:
558
                wtick.next = False
559
            elif tbi < len(UDATA):
560
                ud2 = UDATA[tbi]
561
                # print(tbi, o_byte, ud2)
562
                if o_byte != ud2:
563
                    tstate.next = tb_state.RESET
564
                    i_mode.next = IDLE
565
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
566
                    raise Error("bad result")
567
                    tstate.next = tb_state.FAIL
568
                tbi.next = tbi + 1
569
                i_addr.next = tbi + 1
570
                wtick.next = True
571
            else:
572
                print(len(UDATA))
573
                print("ALL OK!", tbi)
574
                led2_r.next = 0
575
                i_mode.next = IDLE
576
                resume.next = 1
577
                """
578
                """
579
                tstate.next = tb_state.CPAUSE
580
                # tstate.next = tb_state.HALT
581
 
582
        elif tstate == tb_state.CPAUSE:
583 5 tomtor
            if SLOWDOWN <= 4:
584
                raise StopSimulation()
585 2 tomtor
            if resume == 0:
586
                print("--------------RESET-------------")
587
                o_led.next = o_led + 1
588
                tstate.next = tb_state.RESET
589
            else:
590
                led0_g.next = not led0_g
591
                resume.next = resume + 1
592
 
593
        """
594
        if now() > 50000:
595
            raise StopSimulation()
596
        """
597
 
598
    if SLOWDOWN == 1:
599
        return clkgen, dut, count, logic
600
    else:
601
        return dut, count, logic
602
 
603
 
604
if 1: # not COSIMULATION:
605
    SLOWDOWN = 22
606
 
607
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
608
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
609
 
610
    tb.convert(initial_values=False)
611
 
612
if 1:
613
    SLOWDOWN = 1
614
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
615
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
616
    print("convert SLOWDOWN: ", SLOWDOWN)
617
    tb.convert(name="test_fast_bench", initial_values=True)
618
    """
619
    os.system("iverilog -o test_deflate " +
620
              "test_fast_bench.v dump.v; " +
621
              "vvp test_deflate")
622
              """
623
if 1:
624
    print("Start Unit test")
625
    unittest.main(verbosity=2)

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.