OpenCores
URL https://opencores.org/ocsvn/hdl-deflate/hdl-deflate/trunk

Subversion Repositories hdl-deflate

[/] [hdl-deflate/] [trunk/] [test_deflate.py] - Blame information for rev 4

Go to most recent revision | Details | Compare with Previous | View Log

Line No. Rev Author Line
1 2 tomtor
import unittest
2
import os
3
import zlib
4
import random
5
 
6
from myhdl import delay, now, Signal, intbv, ResetSignal, Simulation, \
7
                  Cosimulation, block, instance, StopSimulation, modbv, \
8
                  always, always_seq, always_comb, enum, Error
9
 
10
from deflate import IDLE, WRITE, READ, STARTC, STARTD, LBSIZE, IBSIZE, CWINDOW
11
 
12
MAXW = 2 * CWINDOW
13
 
14
COSIMULATION = True
15
COSIMULATION = False
16
 
17
if not COSIMULATION:
18
    from deflate import deflate
19
else:
20
    def deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
21
                o_byte, i_addr, clk, reset):
22
        print("Cosimulation")
23
        cmd = "iverilog -o deflate " + \
24
              "deflate.v " + \
25
              "tb_deflate.v "  # "dump.v "
26
        os.system(cmd)
27
        return Cosimulation("vvp -m ./myhdl deflate",
28
                            i_mode=i_mode, o_done=o_done,
29
                            i_data=i_data, o_iprogress=o_iprogress,
30
                            o_oprogress=o_oprogress,
31
                            o_byte=o_byte, i_addr=i_addr,
32
                            clk=clk, reset=reset)
33
 
34
 
35
def test_data(m):
36
    print("MODE", m)
37
    if m == 0:
38
        str_data = " ".join(["Hello World! " + str(1) + " "
39
                             for i in range(100)])
40
        b_data = str_data.encode('utf-8')
41
    elif m == 1:
42
        str_data = " ".join(["   Hello World! " + str(i) + "     "
43
        # str_data = " ".join(["Hello World! " + str(i) + " "
44
                             for i in range(5)])
45
        b_data = str_data.encode('utf-8')
46
    elif m == 2:
47
        str_data = " ".join(["Hi: " + str(random.randrange(0,0x1000)) + " "
48
                             for i in range(100)])
49
        b_data = str_data.encode('utf-8')
50
    elif m == 3:
51
        b_data = bytes([random.randrange(0,0x100) for i in range(100)])
52 4 tomtor
    elif m == 4:
53
        str_data = "".join([str(random.randrange(0,2))
54
                             for i in range(1000)])
55
        b_data = str_data.encode('utf-8')
56 2 tomtor
    else:
57
        raise Error("unknown test mode")
58
    b_data = b_data[:IBSIZE-4]
59
    zl_data = zlib.compress(b_data)
60
    print("From %d to %d bytes" % (len(b_data), len(zl_data)))
61
    print(zl_data)
62
    return b_data, zl_data
63
 
64
 
65
class TestDeflate(unittest.TestCase):
66
 
67
    def testMain(self):
68
 
69
        def test_decompress(i_mode, o_done, i_data, o_iprogress,
70
                            o_oprogress, o_byte, i_addr, clk, reset):
71
 
72
            def tick():
73
                clk.next = not clk
74
 
75
            print("")
76
            print("==========================")
77
            print("START TEST MODE", mode)
78
            print("==========================")
79
 
80
            b_data, zl_data = test_data(mode)
81
 
82
            reset.next = 0
83
            tick()
84
            yield delay(5)
85
            reset.next = 1
86
            tick()
87
            yield delay(5)
88
 
89
            print("STARTD")
90
            i_mode.next = STARTD
91
            tick()
92
            yield delay(5)
93
            tick()
94
            yield delay(5)
95
            i_mode.next = IDLE
96
 
97
            print("WRITE")
98
            i_mode.next = WRITE
99
            i = 0
100
            while i < len(zl_data):
101
                if o_iprogress > i - MAXW:
102
                    # print("write", i)
103
                    i_data.next = zl_data[i]
104
                    i_addr.next = i
105
                    i = i + 1
106
                else:
107
                    print("Wait for space")
108
                tick()
109
                yield delay(5)
110
                tick()
111
                yield delay(5)
112
            i_mode.next = IDLE
113
            print("Wrote input, wait for end of decompression")
114
 
115
            # alternative without sliding window:
116
            """
117
            print("WRITE")
118
            i_mode.next = WRITE
119
            for i in range(len(zl_data)):
120
                i_data.next = zl_data[i]
121
                i_addr.next = i
122
                tick()
123
                yield delay(5)
124
                tick()
125
                yield delay(5)
126
            i_mode.next = IDLE
127
 
128
            print("STARTD")
129
            i_mode.next = STARTD
130
            tick()
131
            yield delay(5)
132
            tick()
133
            yield delay(5)
134
            i_mode.next = IDLE
135
            """
136
 
137
            print(now())
138
            while not o_done:
139
                tick()
140
                yield delay(5)
141
                tick()
142
                yield delay(5)
143
            print(now())
144
 
145
            last = o_oprogress
146
            print("GOT", last)
147
            i_mode.next = READ
148
            d_data = []
149
            for i in range(last):
150
                i_addr.next = i
151
                tick()
152
                yield delay(5)
153
                tick()
154
                yield delay(5)
155
                d_data.append(bytes([o_byte]))
156
            i_mode.next = IDLE
157
 
158
            d_data = b''.join(d_data)
159
 
160
            self.assertEqual(b_data, d_data, "decompress does NOT match")
161
            print(len(d_data), len(zl_data))
162
 
163
            print("==========COMPRESS TEST=========")
164
 
165
            i_mode.next = WRITE
166
            for i in range(len(b_data)):
167
                i_data.next = b_data[i]
168
                i_addr.next = i
169
                tick()
170
                yield delay(5)
171
                tick()
172
                yield delay(5)
173
            i_mode.next = IDLE
174
 
175
            i_mode.next = STARTC
176
            tick()
177
            yield delay(5)
178
            tick()
179
            yield delay(5)
180
            i_mode.next = IDLE
181
 
182
            print(now())
183
            while not o_done:
184
                tick()
185
                yield delay(5)
186
                tick()
187
                yield delay(5)
188
            print(now())
189
 
190
            # raise Error("STOP")
191
 
192
            last = o_oprogress
193
            print("last", last)
194
            i_mode.next = READ
195
            c_data = []
196
            for i in range(last):
197
                i_addr.next = i
198
                tick()
199
                yield delay(5)
200
                tick()
201
                yield delay(5)
202
                c_data.append(bytes([o_byte]))
203
            i_mode.next = IDLE
204
 
205
            print("b_data:", len(b_data), b_data)
206
            c_data = b''.join(c_data)
207
            print("c_data:", len(c_data), c_data)
208
            print("zl_data:", len(zl_data), zl_data)
209
 
210
            print("zlib test:", zlib.decompress(c_data))
211
 
212
            print("WRITE COMPRESSED RESULT")
213
            i_mode.next = WRITE
214
            for i in range(len(c_data)):
215
                i_data.next = c_data[i]
216
                i_addr.next = i
217
                tick()
218
                yield delay(5)
219
                tick()
220
                yield delay(5)
221
            i_mode.next = IDLE
222
 
223
            print("STARTD after Compress")
224
            i_mode.next = STARTD
225
            tick()
226
            yield delay(5)
227
            tick()
228
            yield delay(5)
229
            i_mode.next = IDLE
230
 
231
            print(now())
232
            while not o_done:
233
                tick()
234
                yield delay(5)
235
                tick()
236
                yield delay(5)
237
            print(now())
238
 
239
            last = o_oprogress
240
            i_mode.next = READ
241
            d_data = []
242
            for i in range(last):
243
                i_addr.next = i
244
                tick()
245
                yield delay(5)
246
                tick()
247
                yield delay(5)
248
                d_data.append(bytes([o_byte]))
249
            i_mode.next = IDLE
250
 
251
            d_data = b''.join(d_data)
252
 
253
            self.assertEqual(b_data, d_data, "decompress after compress does NOT match")
254
            print(len(b_data), len(zl_data), len(c_data))
255
 
256
        for loop in range(1):
257 4 tomtor
            for mode in range(5):
258 2 tomtor
                self.runTests(test_decompress)
259
 
260
    def runTests(self, test):
261
        """Helper method to run the actual tests."""
262
 
263
        i_mode = Signal(intbv(0)[3:])
264
        o_done = Signal(bool(0))
265
 
266
        i_data = Signal(intbv()[8:])
267
        o_byte = Signal(intbv()[8:])
268
        o_iprogress = Signal(intbv()[LBSIZE:])
269
        o_oprogress = Signal(intbv()[LBSIZE:])
270
        i_addr = Signal(intbv()[LBSIZE:])
271
 
272
        clk = Signal(bool(0))
273
        reset = ResetSignal(1, 0, True)
274
 
275
        dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
276
                      o_byte, i_addr, clk, reset)
277
 
278
        check = test(i_mode, o_done, i_data, o_iprogress, o_oprogress,
279
                     o_byte, i_addr, clk, reset)
280
        sim = Simulation(dut, check)
281
        # traceSignals(dut)
282
        sim.run(quiet=1)
283
 
284
 
285
SLOWDOWN = 1
286
 
287
@block
288
def test_deflate_bench(i_clk, o_led, led0_g, led1_b, led2_r):
289
 
290
    u_data, c_data = test_data(1)
291
 
292
    CDATA = tuple(c_data)
293
    UDATA = tuple(u_data)
294
 
295
    i_mode = Signal(intbv(0)[3:])
296
    o_done = Signal(bool(0))
297
 
298
    i_data = Signal(intbv()[8:])
299
    o_byte = Signal(intbv()[8:])
300
    u_data = Signal(intbv()[8:])
301
    o_iprogress = Signal(intbv()[LBSIZE:])
302
    o_oprogress = Signal(intbv()[LBSIZE:])
303
    resultlen = Signal(intbv()[LBSIZE:])
304
    i_addr = Signal(intbv()[LBSIZE:])
305
 
306
    reset = ResetSignal(1, 0, True)
307
 
308
    dut = deflate(i_mode, o_done, i_data, o_iprogress, o_oprogress,
309
                  o_byte, i_addr, i_clk, reset)
310
 
311
    tb_state = enum('RESET', 'START', 'WRITE', 'DECOMPRESS', 'WAIT', 'VERIFY',
312
                    'PAUSE', 'CWRITE', 'COMPRESS', 'CWAIT', 'CRESULT',
313
                    'VWRITE', 'VDECOMPRESS', 'VWAIT', 'CVERIFY', 'CPAUSE',
314
                    'FAIL', 'HALT')
315
    tstate = Signal(tb_state.RESET)
316
 
317
    tbi = Signal(modbv(0)[15:])
318
    copy = Signal(intbv()[8:])
319
 
320
    scounter = Signal(modbv(0)[SLOWDOWN:])
321
    counter = Signal(modbv(0)[16:])
322
 
323
    wtick = Signal(bool(0))
324
 
325
    resume = Signal(modbv(0)[6:])
326
 
327
    @instance
328
    def clkgen():
329
        i_clk.next = 0
330
        while True:
331
            yield delay(5)
332
            i_clk.next = not i_clk
333
 
334
    @always(i_clk.posedge)
335
    def count():
336
        # o_led.next = counter
337
        if scounter == 0:
338
            counter.next = counter + 1
339
        scounter.next = scounter + 1
340
 
341
    @always(i_clk.posedge)
342
    def logic():
343
 
344
        if tstate == tb_state.RESET:
345
            print("RESET", counter)
346
            reset.next = 0
347
            led0_g.next = 0
348
            led1_b.next = 0
349
            led2_r.next = 0
350
            tbi.next = 0
351
            tstate.next = tb_state.START
352
 
353
        elif SLOWDOWN > 2 and scounter != 0:
354
            pass
355
 
356
        elif tstate == tb_state.START:
357
            # A few cycles reset low
358
            if tbi < 1:
359
                tbi.next = tbi.next + 1
360
            else:
361
                reset.next = 1
362
                tstate.next = tb_state.WRITE
363
                tbi.next = 0
364
 
365
        elif tstate == tb_state.HALT:
366
            led0_g.next = 1
367
            led2_r.next = 0
368
            led1_b.next = 0
369
 
370
        elif tstate == tb_state.FAIL:
371
            # Failure: blink all color leds
372
            led0_g.next = not led0_g
373
            led2_r.next = not led2_r
374
            led1_b.next = o_done
375
 
376
        elif tstate == tb_state.WRITE:
377
            if tbi < len(CDATA):
378
                # print(tbi)
379
                o_led.next = tbi
380
                led1_b.next = o_done
381
                led2_r.next = not led2_r
382
                i_mode.next = WRITE
383
                i_data.next = CDATA[tbi]
384
                i_addr.next = tbi
385
                tbi.next = tbi + 1
386
            else:
387
                i_mode.next = IDLE
388
                led2_r.next = 0
389
                tstate.next = tb_state.DECOMPRESS
390
 
391
        elif tstate == tb_state.DECOMPRESS:
392
            i_mode.next = STARTD
393
            tstate.next = tb_state.WAIT
394
 
395
        elif tstate == tb_state.WAIT:
396
            led1_b.next = not led1_b
397
            i_mode.next = IDLE
398
            if i_mode == IDLE and o_done:
399
                print("result len", o_oprogress)
400
                if o_oprogress == 0x4F:
401
                    tstate.next = tb_state.HALT
402
                resultlen.next = o_oprogress
403
                tbi.next = 0
404
                i_addr.next = 0
405
                i_mode.next = READ
406
                wtick.next = True
407
                tstate.next = tb_state.VERIFY
408
                """
409
                if o_oprogress == 0x4F:
410
                    tstate.next = tb_state.HALT
411
                else:
412
                    tstate.next = tb_state.FAIL
413
                """
414
 
415
        elif tstate == tb_state.VERIFY:
416
            # print("VERIFY", o_data)
417
            led1_b.next = 0
418
            o_led.next = tbi
419
            """
420
            Note that the read can also be pipelined in a tight loop
421
            without the WTICK delay, but this will not work with
422
            SLOWDOWN > 1
423
            """
424
            if wtick:
425
                wtick.next = False
426
            elif tbi < len(UDATA):
427
                led2_r.next = not led2_r
428
                ud1= UDATA[tbi]
429
                # print(o_byte, ud1)
430
                if o_byte != ud1:
431
                    i_mode.next = IDLE
432
                    print("FAIL", len(UDATA), tbi, o_byte, ud1)
433
                    # resume.next = 1
434
                    # tstate.next = tb_state.PAUSE
435
                    tstate.next = tb_state.FAIL
436
                    # tstate.next = tb_state.RESET
437
                    raise Error("bad result")
438
                else:
439
                    pass
440
                    # print(tbi, o_data)
441
                i_addr.next = tbi + 1
442
                tbi.next = tbi + 1
443
                wtick.next = True
444
            else:
445
                print(len(UDATA))
446
                print("DECOMPRESS test OK!, pausing", tbi)
447
                i_mode.next = IDLE
448
                tbi.next = 0
449
                tstate.next = tb_state.PAUSE
450
                # tstate.next = tb_state.HALT
451
                # state.next = tb_state.CPAUSE
452
                resume.next = 1
453
                # tstate.next = tb_state.CWRITE
454
 
455
        elif tstate == tb_state.PAUSE:
456
            led2_r.next = 0
457
            if resume == 0:
458
                print("--------------COMPRESS-------------")
459
                tbi.next = 0
460
                led0_g.next = 0
461
                tstate.next = tb_state.CWRITE
462
                # tstate.next = tb_state.RESET
463
            else:
464
                led2_r.next = not led2_r
465
                resume.next = resume + 1
466
 
467
        #####################################
468
        # COMPRESS TEST
469
        #####################################
470
 
471
        elif tstate == tb_state.CWRITE:
472
            o_led.next = tbi
473
            if tbi < len(UDATA):
474
                # print(tbi)
475
                led2_r.next = 0
476
                led1_b.next = not led1_b
477
                i_mode.next = WRITE
478
                i_data.next = UDATA[tbi]
479
                i_addr.next = tbi
480
                tbi.next = tbi + 1
481
            else:
482
                print("wrote bytes to compress", tbi)
483
                i_mode.next = IDLE
484
                tstate.next = tb_state.COMPRESS
485
 
486
        elif tstate == tb_state.COMPRESS:
487
            i_mode.next = STARTC
488
            tstate.next = tb_state.CWAIT
489
 
490
        elif tstate == tb_state.CWAIT:
491
            led2_r.next = not led2_r
492
            if i_mode == STARTC:
493
                print("WAIT COMPRESS")
494
                i_mode.next = IDLE
495
                led1_b.next = 0
496
            elif o_done:
497
                print("result len", o_oprogress)
498
                resultlen.next = o_oprogress
499
                tbi.next = 0
500
                i_addr.next = 0
501
                i_mode.next = READ
502
                tstate.next = tb_state.CRESULT
503
                wtick.next = True
504
 
505
        # verify compression
506
        elif tstate == tb_state.CRESULT:
507
            # print("COPY COMPRESS RESULT", tbi, o_data)
508
            led2_r.next = 0
509
            o_led.next = tbi
510
            if wtick:
511
                if tbi > 0:
512
                    i_mode.next = WRITE
513
                    i_data.next = copy
514
                    i_addr.next = tbi - 1
515
                wtick.next = False
516
                tbi.next = tbi + 1
517
            elif tbi < resultlen:
518
                i_mode.next = READ
519
                led1_b.next = not led1_b
520
                i_addr.next = tbi
521
                copy.next = o_byte
522
                wtick.next = True
523
            else:
524
                print("Compress output bytes copied to input", resultlen, tbi - 1)
525
                i_mode.next = IDLE
526
                tbi.next = 0
527
                tstate.next = tb_state.VDECOMPRESS
528
 
529
        elif tstate == tb_state.VDECOMPRESS:
530
            print("start decompress of test compression")
531
            i_mode.next = STARTD
532
            tstate.next = tb_state.VWAIT
533
 
534
        elif tstate == tb_state.VWAIT:
535
            led2_r.next = 0
536
            led1_b.next = not led1_b
537
            i_mode.next = IDLE
538
            if i_mode == IDLE and o_done:
539
                print("DONE DECOMPRESS VERIFY", o_oprogress)
540
                tbi.next = 0
541
                i_addr.next = 0
542
                i_mode.next = READ
543
                wtick.next = True
544
                tstate.next = tb_state.CVERIFY
545
 
546
        elif tstate == tb_state.CVERIFY:
547
            # print("COMPRESS VERIFY", tbi, o_byte)
548
            led1_b.next = 0
549
            led2_r.next = not led2_r
550
            o_led.next = tbi
551
            if wtick:
552
                wtick.next = False
553
            elif tbi < len(UDATA):
554
                ud2 = UDATA[tbi]
555
                # print(tbi, o_byte, ud2)
556
                if o_byte != ud2:
557
                    tstate.next = tb_state.RESET
558
                    i_mode.next = IDLE
559
                    print("FAIL", len(UDATA), tbi, ud2, o_byte)
560
                    raise Error("bad result")
561
                    tstate.next = tb_state.FAIL
562
                tbi.next = tbi + 1
563
                i_addr.next = tbi + 1
564
                wtick.next = True
565
            else:
566
                print(len(UDATA))
567
                print("ALL OK!", tbi)
568
                led2_r.next = 0
569
                i_mode.next = IDLE
570
                resume.next = 1
571
                if SLOWDOWN <= 4:
572
                    raise StopSimulation()
573
                """
574
                """
575
                tstate.next = tb_state.CPAUSE
576
                # tstate.next = tb_state.HALT
577
 
578
        elif tstate == tb_state.CPAUSE:
579
            if resume == 0:
580
                print("--------------RESET-------------")
581
                o_led.next = o_led + 1
582
                tstate.next = tb_state.RESET
583
            else:
584
                led0_g.next = not led0_g
585
                resume.next = resume + 1
586
 
587
        """
588
        if now() > 50000:
589
            raise StopSimulation()
590
        """
591
 
592
    if SLOWDOWN == 1:
593
        return clkgen, dut, count, logic
594
    else:
595
        return dut, count, logic
596
 
597
 
598
if 1: # not COSIMULATION:
599
    SLOWDOWN = 22
600
 
601
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
602
                        Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
603
 
604
    tb.convert(initial_values=False)
605
 
606
if 1:
607
    SLOWDOWN = 1
608
    tb = test_deflate_bench(Signal(bool(0)), Signal(intbv(0)[4:]),
609
                            Signal(bool(0)), Signal(bool(0)), Signal(bool(0)))
610
    print("convert SLOWDOWN: ", SLOWDOWN)
611
    tb.convert(name="test_fast_bench", initial_values=True)
612
    """
613
    os.system("iverilog -o test_deflate " +
614
              "test_fast_bench.v dump.v; " +
615
              "vvp test_deflate")
616
              """
617
if 1:
618
    print("Start Unit test")
619
    unittest.main(verbosity=2)

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.