OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Blame information for rev 2

Go to most recent revision | Details | Compare with Previous | View Log

Line No. Rev Author Line
1 2 tomtor
import unittest
2
import os
3
import zlib
4
import random
5
 
6
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
7
                  Cosimulation, block, instance, StopSimulation, modbv, \
8
                  always, always_seq, always_comb, enum, Error
9
 
10
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, CWINDOW
11
 
12
MAXW = 2 * CWINDOW
13
 
14
COSIMULATION = True
15
COSIMULATION = False
16
 
17
if not COSIMULATION:
18
    from deflate import deflate
19
else:
20
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
21
                o_byte, i_addr, clk, reset):
22
        print("Cosimulation")
23
        cmd = "iverilog -o deflate " + \
24
              "deflate.v " + \
25
              "tb_deflate.v "  # "dump.v "
26
        os.system(cmd)
27
        return Cosimulation("vvp -m ./myhdl deflate",
28
                            i_mode=i_mode, o_done=o_done,
29
                            i_data=i_data, o_iprogress=o_iprogress,
30
                            o_oprogress=o_oprogress,
31
                            o_byte=o_byte, i_addr=i_addr,
32
                            clk=clk, reset=reset)
33
 
34
 
35
def test_data(m):
36
    print("MODE", m)
37
    if m == 0:
38
        str_data = " ".join(["Hello World! " + str(1) + " "
39
                             for i in range(100)])
40
        b_data = str_data.encode('utf-8')
41
    elif m == 1:
42
        str_data = " ".join(["   Hello World! " + str(i) + "     "
43
        # str_data = " ".join(["Hello World! " + str(i) + " "
44
                             for i in range(5)])
45
        b_data = str_data.encode('utf-8')
46
    elif m == 2:
47
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
48
                             for i in range(100)])
49
        b_data = str_data.encode('utf-8')
50
    elif m == 3:
51
        b_data = bytes([random.randrange(0,0x100) for i in range(100)])
52
    else:
53
        raise Error("unknown test mode")
54
    b_data = b_data[:IBSIZE-4]
55
    zl_data = zlib.compress(b_data)
56
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
57
    print(zl_data)
58
    return b_data, zl_data
59
 
60
 
61
class TestDeflate(unittest.TestCase):
62
 
63
    def testMain(self):
64
 
65
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
66
                            o_oprogress, o_byte, i_addr, clk, reset):
67
 
68
            def tick():
69
                clk.next = not clk
70
 
71
            print("")
72
            print("==========================")
73
            print("START TEST MODE", mode)
74
            print("==========================")
75
 
76
            b_data, zl_data = test_data(mode)
77
 
78
            reset.next = 0
79
            tick()
80
            yield delay(5)
81
            reset.next = 1
82
            tick()
83
            yield delay(5)
84
 
85
            print("STARTD")
86
            i_mode.next = STARTD
87
            tick()
88
            yield delay(5)
89
            tick()
90
            yield delay(5)
91
            i_mode.next = IDLE
92
 
93
            print("WRITE")
94
            i_mode.next = WRITE
95
            i = 0
96
            while i < len(zl_data):
97
                if o_iprogress > i - MAXW:
98
                    # print("write", i)
99
                    i_data.next = zl_data[i]
100
                    i_addr.next = i
101
                    i = i + 1
102
                else:
103
                    print("Wait for space")
104
                tick()
105
                yield delay(5)
106
                tick()
107
                yield delay(5)
108
            i_mode.next = IDLE
109
            print("Wrote input, wait for end of decompression")
110
 
111
            # alternative without sliding window:
112
            """
113
            print("WRITE")
114
            i_mode.next = WRITE
115
            for i in range(len(zl_data)):
116
                i_data.next = zl_data[i]
117
                i_addr.next = i
118
                tick()
119
                yield delay(5)
120
                tick()
121
                yield delay(5)
122
            i_mode.next = IDLE
123
 
124
            print("STARTD")
125
            i_mode.next = STARTD
126
            tick()
127
            yield delay(5)
128
            tick()
129
            yield delay(5)
130
            i_mode.next = IDLE
131
            """
132
 
133
            print(now())
134
            while not o_done:
135
                tick()
136
                yield delay(5)
137
                tick()
138
                yield delay(5)
139
            print(now())
140
 
141
            last = o_oprogress
142
            print("GOT", last)
143
            i_mode.next = READ
144
            d_data = []
145
            for i in range(last):
146
                i_addr.next = i
147
                tick()
148
                yield delay(5)
149
                tick()
150
                yield delay(5)
151
                d_data.append(bytes([o_byte]))
152
            i_mode.next = IDLE
153
 
154
            d_data = b''.join(d_data)
155
 
156
            self.assertEqual(b_data, d_data, "decompress does NOT match")
157
            print(len(d_data), len(zl_data))
158
 
159
            print("==========COMPRESS TEST=========")
160
 
161
            i_mode.next = WRITE
162
            for i in range(len(b_data)):
163
                i_data.next = b_data[i]
164
                i_addr.next = i
165
                tick()
166
                yield delay(5)
167
                tick()
168
                yield delay(5)
169
            i_mode.next = IDLE
170
 
171
            i_mode.next = STARTC
172
            tick()
173
            yield delay(5)
174
            tick()
175
            yield delay(5)
176
            i_mode.next = IDLE
177
 
178
            print(now())
179
            while not o_done:
180
                tick()
181
                yield delay(5)
182
                tick()
183
                yield delay(5)
184
            print(now())
185
 
186
            # raise Error("STOP")
187
 
188
            last = o_oprogress
189
            print("last", last)
190
            i_mode.next = READ
191
            c_data = []
192
            for i in range(last):
193
                i_addr.next = i
194
                tick()
195
                yield delay(5)
196
                tick()
197
                yield delay(5)
198
                c_data.append(bytes([o_byte]))
199
            i_mode.next = IDLE
200
 
201
            print("b_data:", len(b_data), b_data)
202
            c_data = b''.join(c_data)
203
            print("c_data:", len(c_data), c_data)
204
            print("zl_data:", len(zl_data), zl_data)
205
 
206
            print("zlib test:", zlib.decompress(c_data))
207
 
208
            print("WRITE COMPRESSED RESULT")
209
            i_mode.next = WRITE
210
            for i in range(len(c_data)):
211
                i_data.next = c_data[i]
212
                i_addr.next = i
213
                tick()
214
                yield delay(5)
215
                tick()
216
                yield delay(5)
217
            i_mode.next = IDLE
218
 
219
            print("STARTD after Compress")
220
            i_mode.next = STARTD
221
            tick()
222
            yield delay(5)
223
            tick()
224
            yield delay(5)
225
            i_mode.next = IDLE
226
 
227
            print(now())
228
            while not o_done:
229
                tick()
230
                yield delay(5)
231
                tick()
232
                yield delay(5)
233
            print(now())
234
 
235
            last = o_oprogress
236
            i_mode.next = READ
237
            d_data = []
238
            for i in range(last):
239
                i_addr.next = i
240
                tick()
241
                yield delay(5)
242
                tick()
243
                yield delay(5)
244
                d_data.append(bytes([o_byte]))
245
            i_mode.next = IDLE
246
 
247
            d_data = b''.join(d_data)
248
 
249
            self.assertEqual(b_data, d_data, "decompress after compress does NOT match")
250
            print(len(b_data), len(zl_data), len(c_data))
251
 
252
        for loop in range(1):
253
            for mode in range(4):
254
                self.runTests(test_decompress)
255
 
256
    def runTests(self, test):
257
        """Helper method to run the actual tests."""
258
 
259
        i_mode = Signal(intbv(0)[3:])
260
        o_done = Signal(bool(0))
261
 
262
        i_data = Signal(intbv()[8:])
263
        o_byte = Signal(intbv()[8:])
264
        o_iprogress = Signal(intbv()[LBSIZE:])
265
        o_oprogress = Signal(intbv()[LBSIZE:])
266
        i_addr = Signal(intbv()[LBSIZE:])
267
 
268
        clk = Signal(bool(0))
269
        reset = ResetSignal(1, 0, True)
270
 
271
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
272
                      o_byte, i_addr, clk, reset)
273
 
274
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
275
                     o_byte, i_addr, clk, reset)
276
        sim = Simulation(dut, check)
277
        # traceSignals(dut)
278
        sim.run(quiet=1)
279
 
280
 
281
SLOWDOWN = 1
282
 
283
@block
284
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
285
 
286
    u_data, c_data = test_data(1)
287
 
288
    CDATA = tuple(c_data)
289
    UDATA = tuple(u_data)
290
 
291
    i_mode = Signal(intbv(0)[3:])
292
    o_done = Signal(bool(0))
293
 
294
    i_data = Signal(intbv()[8:])
295
    o_byte = Signal(intbv()[8:])
296
    u_data = Signal(intbv()[8:])
297
    o_iprogress = Signal(intbv()[LBSIZE:])
298
    o_oprogress = Signal(intbv()[LBSIZE:])
299
    resultlen = Signal(intbv()[LBSIZE:])
300
    i_addr = Signal(intbv()[LBSIZE:])
301
 
302
    reset = ResetSignal(1, 0, True)
303
 
304
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
305
                  o_byte, i_addr, i_clk, reset)
306
 
307
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
308
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
309
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
310
                    'FAIL', 'HALT')
311
    tstate = Signal(tb_state.RESET)
312
 
313
    tbi = Signal(modbv(0)[15:])
314
    copy = Signal(intbv()[8:])
315
 
316
    scounter = Signal(modbv(0)[SLOWDOWN:])
317
    counter = Signal(modbv(0)[16:])
318
 
319
    wtick = Signal(bool(0))
320
 
321
    resume = Signal(modbv(0)[6:])
322
 
323
    @instance
324
    def clkgen():
325
        i_clk.next = 0
326
        while True:
327
            yield delay(5)
328
            i_clk.next = not i_clk
329
 
330
    @always(i_clk.posedge)
331
    def count():
332
        # o_led.next = counter
333
        if scounter == 0:
334
            counter.next = counter + 1
335
        scounter.next = scounter + 1
336
 
337
    @always(i_clk.posedge)
338
    def logic():
339
 
340
        if tstate == tb_state.RESET:
341
            print("RESET", counter)
342
            reset.next = 0
343
            led0_g.next = 0
344
            led1_b.next = 0
345
            led2_r.next = 0
346
            tbi.next = 0
347
            tstate.next = tb_state.START
348
 
349
        elif SLOWDOWN > 2 and scounter != 0:
350
            pass
351
 
352
        elif tstate == tb_state.START:
353
            # A few cycles reset low
354
            if tbi < 1:
355
                tbi.next = tbi.next + 1
356
            else:
357
                reset.next = 1
358
                tstate.next = tb_state.WRITE
359
                tbi.next = 0
360
 
361
        elif tstate == tb_state.HALT:
362
            led0_g.next = 1
363
            led2_r.next = 0
364
            led1_b.next = 0
365
 
366
        elif tstate == tb_state.FAIL:
367
            # Failure: blink all color leds
368
            led0_g.next = not led0_g
369
            led2_r.next = not led2_r
370
            led1_b.next = o_done
371
 
372
        elif tstate == tb_state.WRITE:
373
            if tbi < len(CDATA):
374
                # print(tbi)
375
                o_led.next = tbi
376
                led1_b.next = o_done
377
                led2_r.next = not led2_r
378
                i_mode.next = WRITE
379
                i_data.next = CDATA[tbi]
380
                i_addr.next = tbi
381
                tbi.next = tbi + 1
382
            else:
383
                i_mode.next = IDLE
384
                led2_r.next = 0
385
                tstate.next = tb_state.DECOMPRESS
386
 
387
        elif tstate == tb_state.DECOMPRESS:
388
            i_mode.next = STARTD
389
            tstate.next = tb_state.WAIT
390
 
391
        elif tstate == tb_state.WAIT:
392
            led1_b.next = not led1_b
393
            i_mode.next = IDLE
394
            if i_mode == IDLE and o_done:
395
                print("result len", o_oprogress)
396
                if o_oprogress == 0x4F:
397
                    tstate.next = tb_state.HALT
398
                resultlen.next = o_oprogress
399
                tbi.next = 0
400
                i_addr.next = 0
401
                i_mode.next = READ
402
                wtick.next = True
403
                tstate.next = tb_state.VERIFY
404
                """
405
                if o_oprogress == 0x4F:
406
                    tstate.next = tb_state.HALT
407
                else:
408
                    tstate.next = tb_state.FAIL
409
                """
410
 
411
        elif tstate == tb_state.VERIFY:
412
            # print("VERIFY", o_data)
413
            led1_b.next = 0
414
            o_led.next = tbi
415
            """
416
            Note that the read can also be pipelined in a tight loop
417
            without the WTICK delay, but this will not work with
418
            SLOWDOWN > 1
419
            """
420
            if wtick:
421
                wtick.next = False
422
            elif tbi < len(UDATA):
423
                led2_r.next = not led2_r
424
                ud1= UDATA[tbi]
425
                # print(o_byte, ud1)
426
                if o_byte != ud1:
427
                    i_mode.next = IDLE
428
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
429
                    # resume.next = 1
430
                    # tstate.next = tb_state.PAUSE
431
                    tstate.next = tb_state.FAIL
432
                    # tstate.next = tb_state.RESET
433
                    raise Error("bad result")
434
                else:
435
                    pass
436
                    # print(tbi, o_data)
437
                i_addr.next = tbi + 1
438
                tbi.next = tbi + 1
439
                wtick.next = True
440
            else:
441
                print(len(UDATA))
442
                print("DECOMPRESS test OK!, pausing", tbi)
443
                i_mode.next = IDLE
444
                tbi.next = 0
445
                tstate.next = tb_state.PAUSE
446
                # tstate.next = tb_state.HALT
447
                # state.next = tb_state.CPAUSE
448
                resume.next = 1
449
                # tstate.next = tb_state.CWRITE
450
 
451
        elif tstate == tb_state.PAUSE:
452
            led2_r.next = 0
453
            if resume == 0:
454
                print("--------------COMPRESS-------------")
455
                tbi.next = 0
456
                led0_g.next = 0
457
                tstate.next = tb_state.CWRITE
458
                # tstate.next = tb_state.RESET
459
            else:
460
                led2_r.next = not led2_r
461
                resume.next = resume + 1
462
 
463
        #####################################
464
        # COMPRESS TEST
465
        #####################################
466
 
467
        elif tstate == tb_state.CWRITE:
468
            o_led.next = tbi
469
            if tbi < len(UDATA):
470
                # print(tbi)
471
                led2_r.next = 0
472
                led1_b.next = not led1_b
473
                i_mode.next = WRITE
474
                i_data.next = UDATA[tbi]
475
                i_addr.next = tbi
476
                tbi.next = tbi + 1
477
            else:
478
                print("wrote bytes to compress", tbi)
479
                i_mode.next = IDLE
480
                tstate.next = tb_state.COMPRESS
481
 
482
        elif tstate == tb_state.COMPRESS:
483
            i_mode.next = STARTC
484
            tstate.next = tb_state.CWAIT
485
 
486
        elif tstate == tb_state.CWAIT:
487
            led2_r.next = not led2_r
488
            if i_mode == STARTC:
489
                print("WAIT COMPRESS")
490
                i_mode.next = IDLE
491
                led1_b.next = 0
492
            elif o_done:
493
                print("result len", o_oprogress)
494
                resultlen.next = o_oprogress
495
                tbi.next = 0
496
                i_addr.next = 0
497
                i_mode.next = READ
498
                tstate.next = tb_state.CRESULT
499
                wtick.next = True
500
 
501
        # verify compression
502
        elif tstate == tb_state.CRESULT:
503
            # print("COPY COMPRESS RESULT", tbi, o_data)
504
            led2_r.next = 0
505
            o_led.next = tbi
506
            if wtick:
507
                if tbi > 0:
508
                    i_mode.next = WRITE
509
                    i_data.next = copy
510
                    i_addr.next = tbi - 1
511
                wtick.next = False
512
                tbi.next = tbi + 1
513
            elif tbi < resultlen:
514
                i_mode.next = READ
515
                led1_b.next = not led1_b
516
                i_addr.next = tbi
517
                copy.next = o_byte
518
                wtick.next = True
519
            else:
520
                print("Compress output bytes copied to input", resultlen, tbi - 1)
521
                i_mode.next = IDLE
522
                tbi.next = 0
523
                tstate.next = tb_state.VDECOMPRESS
524
 
525
        elif tstate == tb_state.VDECOMPRESS:
526
            print("start decompress of test compression")
527
            i_mode.next = STARTD
528
            tstate.next = tb_state.VWAIT
529
 
530
        elif tstate == tb_state.VWAIT:
531
            led2_r.next = 0
532
            led1_b.next = not led1_b
533
            i_mode.next = IDLE
534
            if i_mode == IDLE and o_done:
535
                print("DONE DECOMPRESS VERIFY", o_oprogress)
536
                tbi.next = 0
537
                i_addr.next = 0
538
                i_mode.next = READ
539
                wtick.next = True
540
                tstate.next = tb_state.CVERIFY
541
 
542
        elif tstate == tb_state.CVERIFY:
543
            # print("COMPRESS VERIFY", tbi, o_byte)
544
            led1_b.next = 0
545
            led2_r.next = not led2_r
546
            o_led.next = tbi
547
            if wtick:
548
                wtick.next = False
549
            elif tbi < len(UDATA):
550
                ud2 = UDATA[tbi]
551
                # print(tbi, o_byte, ud2)
552
                if o_byte != ud2:
553
                    tstate.next = tb_state.RESET
554
                    i_mode.next = IDLE
555
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
556
                    raise Error("bad result")
557
                    tstate.next = tb_state.FAIL
558
                tbi.next = tbi + 1
559
                i_addr.next = tbi + 1
560
                wtick.next = True
561
            else:
562
                print(len(UDATA))
563
                print("ALL OK!", tbi)
564
                led2_r.next = 0
565
                i_mode.next = IDLE
566
                resume.next = 1
567
                if SLOWDOWN <= 4:
568
                    raise StopSimulation()
569
                """
570
                """
571
                tstate.next = tb_state.CPAUSE
572
                # tstate.next = tb_state.HALT
573
 
574
        elif tstate == tb_state.CPAUSE:
575
            if resume == 0:
576
                print("--------------RESET-------------")
577
                o_led.next = o_led + 1
578
                tstate.next = tb_state.RESET
579
            else:
580
                led0_g.next = not led0_g
581
                resume.next = resume + 1
582
 
583
        """
584
        if now() > 50000:
585
            raise StopSimulation()
586
        """
587
 
588
    if SLOWDOWN == 1:
589
        return clkgen, dut, count, logic
590
    else:
591
        return dut, count, logic
592
 
593
 
594
if 1: # not COSIMULATION:
595
    SLOWDOWN = 22
596
 
597
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
598
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
599
 
600
    tb.convert(initial_values=False)
601
 
602
if 1:
603
    SLOWDOWN = 1
604
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
605
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
606
    print("convert SLOWDOWN: ", SLOWDOWN)
607
    tb.convert(name="test_fast_bench", initial_values=True)
608
    """
609
    os.system("iverilog -o test_deflate " +
610
              "test_fast_bench.v dump.v; " +
611
              "vvp test_deflate")
612
              """
613
if 1:
614
    print("Start Unit test")
615
    unittest.main(verbosity=2)

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.