OpenCores
URL https://opencores.org/ocsvn/nocmodel/nocmodel/trunk

Subversion Repositories nocmodel

[/] [nocmodel/] [trunk/] [examples/] [stress_test.py] - Blame information for rev 4

Details | Compare with Previous | View Log

Line No. Rev Author Line
1 4 dargor
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
 
4
#
5
# NoCmodel stress test
6
#
7
# Author:  Oscar Diaz
8
# Version: 0.1
9
# Date:    20-05-2011
10
 
11
#
12
# This code is free software; you can redistribute it and/or
13
# modify it under the terms of the GNU Lesser General Public
14
# License as published by the Free Software Foundation; either
15
# version 2.1 of the License, or (at your option) any later version.
16
#
17
# This code is distributed in the hope that it will be useful,
18
# but WITHOUT ANY WARRANTY; without even the implied warranty of
19
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20
# Lesser General Public License for more details.
21
#
22
# You should have received a copy of the GNU Lesser General Public
23
# License along with this library; if not, write to the
24
# Free Software  Foundation, Inc., 59 Temple Place, Suite 330,
25
# Boston, MA  02111-1307  USA
26
#
27
 
28
#
29
# Changelog:
30
#
31
# 20-05-2011 : (OD) initial release
32
#
33
 
34
import myhdl
35
import logging
36
import random
37
import time
38
import os
39
 
40
from nocmodel import *
41
from nocmodel.basicmodels import *
42
 
43
# Stress test: generate a list of random sends and receives through all the NoC.
44
 
45
sim_starttime = 10
46
sim_maxtime = 1000
47
sim_sendtime = 900
48
sim_meanperiod = 30 # fifo full using 13
49
sim_stddevperiod = 3
50
 
51
# 1. Create the model
52
 
53
basicnoc = generate_squarenoc(3, with_ipcore=True)
54
 
55
basicnoc.protocol_ref = basic_protocol()
56
 
57
basicnoc.update_nocdata()
58
 
59
# 2. add tlm support, and configure logging
60
add_tbm_basic_support(basicnoc, log_file="simulation.log", log_level=logging.DEBUG)
61
# 2.1 setup byobject logging
62
if not os.access("/tmp/stress_test_log/", os.F_OK):
63
    os.mkdir("/tmp/stress_test_log/")
64
basicnoc.tbmsim.configure_byobject_logging(basefilename="/tmp/stress_test_log/stress", log_level=logging.DEBUG)
65
 
66
# 3. Prepare random packets
67
def do_checkvect(basicnoc, sim_starttime, sim_sendtime, sim_meanperiod, sim_stddevperiod):
68
    checkvect = {}
69
    testvect = {}
70
    for r in basicnoc.router_list():
71
        # generate the time instants for sending, its destination and packet value
72
        tvec = [abs(int(x + random.gauss(0, sim_stddevperiod))) + sim_starttime for x in range(0, sim_sendtime, sim_meanperiod)]
73
        # delete possible repeated values and sort time instants
74
        tvec = list(set(tvec))
75
        tvec.sort()
76
        #avail_dest = r.routingtable.keys().remove(r.get_address())
77
        avail_dest = r.routes_info.keys()
78
        dest = [random.choice(avail_dest) for x in tvec]
79
        value = [random.randrange(65536) for x in tvec]
80
        # generate a list of "test vectors": (<time>, <dest>, <value>)
81
        r.ipcore_ref.tbm.testvect = zip(tvec, dest, value)
82
        testvect[r.get_address()] = r.ipcore_ref.tbm.testvect
83
        # generate a list of expected values at every router destination
84
        # "check vectors" is: {dest : (<time from src>, <src>, <value>)}
85
        for i, d in enumerate(dest):
86
            if d in checkvect:
87
                checkvect[d].append(tuple([tvec[i], r.get_address(), value[i]]))
88
            else:
89
                checkvect[d] = [tuple([tvec[i], r.get_address(), value[i]])]
90
    # sort each checkvect by time
91
    for vect in checkvect.itervalues():
92
        vect.sort(key=lambda x: x[0])
93
    # put checkvects in each destination router
94
    for r in basicnoc.router_list():
95
        r.ipcore_ref.tbm.checkvect = checkvect[r.get_address()]
96
    with open("vectors.txt", "w") as f:
97
        f.write("testvect = %s\n" % repr(testvect))
98
        f.write("checkvect = %s\n" % repr(checkvect))
99
    #return (testvect, checkvect)
100
 
101
try:
102
    execfile("vectors.txt")
103
    for r in basicnoc.router_list():
104
        r.ipcore_ref.tbm.testvect = testvect[r.get_address()]
105
        r.ipcore_ref.tbm.checkvect = checkvect[r.get_address()]
106
    print "Using vectors.txt for test"
107
except:
108
    do_checkvect(basicnoc, sim_starttime, sim_sendtime, sim_meanperiod, sim_stddevperiod)
109
    print "Created new vectors for test"
110
 
111
# 3. Declare generators to put in the TBM simulation: 
112
# double generators that runs its own test vector and checks the expected results
113
 
114
# set ip_cores functionality as myhdl generators
115
def sourcechkgen(din, dout, tbm_ref):
116
    """
117
    This generator drives dout based on gen_data vectors
118
    and reacts to din to check with chk_data vectors.
119
    """
120
    # prepare data indexes for check vectors
121
    check_dict = {}
122
    for idx, val in enumerate(tbm_ref.checkvect):
123
        check_dict[val[2]] = idx
124
    received_idx = [False]*len(tbm_ref.checkvect)
125
 
126
    @myhdl.instance
127
    def datagen():
128
        datacount = 0
129
        protocol_ref = tbm_ref.ipcore_ref.get_protocol_ref()
130
        mysrc = tbm_ref.ipcore_ref.get_address()
131
        tbm_ref.debug("sourcechkgen.datagen: init dout is %s" % repr(dout.val))
132
        gen_data = tbm_ref.testvect
133
 
134
        for test_vector in gen_data:
135
            # test_vector : (<time>, <dest>, <value>)
136
            next_delay = test_vector[0] - myhdl.now()
137
            yield myhdl.delay(next_delay)
138
            dout.next = protocol_ref.newpacket(False, mysrc, test_vector[1], test_vector[2])
139
            tbm_ref.debug("sourcechkgen.datagen: sent test vector <%s>, dout is %s" % (repr(test_vector), repr(dout.val)))
140
        # wait for end of simulation
141
        tbm_ref.debug("sourcechkgen.datagen: test vectors exhausted. Going idle.")
142
        # TODO: check an easy way to go idle in MyHDL
143
        #next_delay = sim_maxtime - myhdl.now()
144
        #yield myhdl.delay(next_delay)
145
        #raise myhdl.StopSimulation("sourcechkgen.datagen: End of simulation")
146
        return
147
 
148
    @myhdl.instance
149
    def datacheck():
150
        protocol_ref = tbm_ref.ipcore_ref.get_protocol_ref()
151
        mydest = tbm_ref.ipcore_ref.get_address()
152
        check_data = tbm_ref.checkvect
153
        while True:
154
            # just check for data reception
155
            yield din
156
            # check packet
157
            inpacket = din.val
158
            # search checkvect by data payload
159
            chkidx = check_dict.get(inpacket["data"])
160
            if chkidx is None:
161
                tbm_ref.error("sourcechkgen.datacheck: unexpected packet : %s" % repr(inpacket))
162
            else:
163
                # check vectors : (<time from src>, <src>, <value>)
164
                expected_vector = check_data[chkidx]
165
                # received packet: report it
166
                received_idx[chkidx] = True
167
                # data was checked before. Inform about packet delay
168
                tbm_ref.debug("sourcechkgen.datacheck: (delay %d) packet received : <%s>, expected src=<%s>, value=<%s> " % (myhdl.now() - expected_vector[0], repr(inpacket), repr(expected_vector[1]), repr(expected_vector[2])))
169
                if inpacket["src"] != expected_vector[1]:
170
                    tbm_ref.error("sourcechkgen.datacheck: source address != %d (%d)" % (expected_vector[1], inpacket["src"]))
171
                if inpacket["dst"] != mydest:
172
                    tbm_ref.error("sourcechkgen.datacheck: destination address != %d (%d)" % (mydest, inpacket["dst"]))
173
 
174
    @myhdl.instance
175
    def finalcheck():
176
        while True:
177
            yield myhdl.delay(sim_maxtime - 1)
178
            # final check: missing packets
179
            tbm_ref.debug("sourcechkgen.finalcheck: check for missing packets.")
180
            if not all(received_idx):
181
                # missing packets:
182
                miss_count = len(received_idx) - sum(received_idx)
183
                tbm_ref.error("sourcechkgen.finalcheck: there are %d missing packets!" % miss_count)
184
                for idx, val in enumerate(tbm_ref.checkvect):
185
                    if not received_idx[idx]:
186
                        tbm_ref.debug("sourcechkgen.finalcheck: missing packet: <%s>" % repr(val))
187
 
188
 
189
    return (datagen, datacheck, finalcheck)
190
 
191
# 5. assign generators to ip cores (in TLM model !)
192
for r in basicnoc.router_list():
193
    r.ipcore_ref.tbm.register_generator(sourcechkgen)
194
 
195
# 6. configure simulation and run!
196
basicnoc.tbmsim.configure_simulation(max_time=sim_maxtime)
197
print "Starting simulation..."
198
runsecs = time.clock()
199
basicnoc.tbmsim.run()
200
runsecs = time.clock() - runsecs
201
print "Simulation finished in %f secs. Pick the results in log files." % runsecs
202
 
203
# 7. View graphical representation
204
 
205
draw_noc(basicnoc)

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.