OpenCores
URL https://opencores.org/ocsvn/nocmodel/nocmodel/trunk

Subversion Repositories nocmodel

[/] [nocmodel/] [trunk/] [examples/] [stress_test.py] - Rev 4

Compare with Previous | Blame | View Log

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
#
# NoCmodel stress test
#
# Author:  Oscar Diaz
# Version: 0.1
# Date:    20-05-2011
 
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software  Foundation, Inc., 59 Temple Place, Suite 330,
# Boston, MA  02111-1307  USA
#
 
#
# Changelog:
#
# 20-05-2011 : (OD) initial release
#
 
import myhdl
import logging
import random
import time
import os
 
from nocmodel import *
from nocmodel.basicmodels import *
 
# Stress test: generate a list of random sends and receives through all the NoC.
 
sim_starttime = 10
sim_maxtime = 1000
sim_sendtime = 900
sim_meanperiod = 30 # fifo full using 13
sim_stddevperiod = 3
 
# 1. Create the model
 
basicnoc = generate_squarenoc(3, with_ipcore=True)
 
basicnoc.protocol_ref = basic_protocol()
 
basicnoc.update_nocdata()
 
# 2. add tlm support, and configure logging
add_tbm_basic_support(basicnoc, log_file="simulation.log", log_level=logging.DEBUG)
# 2.1 setup byobject logging
if not os.access("/tmp/stress_test_log/", os.F_OK):
    os.mkdir("/tmp/stress_test_log/")
basicnoc.tbmsim.configure_byobject_logging(basefilename="/tmp/stress_test_log/stress", log_level=logging.DEBUG)
 
# 3. Prepare random packets
def do_checkvect(basicnoc, sim_starttime, sim_sendtime, sim_meanperiod, sim_stddevperiod):
    checkvect = {}
    testvect = {}
    for r in basicnoc.router_list():
        # generate the time instants for sending, its destination and packet value
        tvec = [abs(int(x + random.gauss(0, sim_stddevperiod))) + sim_starttime for x in range(0, sim_sendtime, sim_meanperiod)]
        # delete possible repeated values and sort time instants
        tvec = list(set(tvec))
        tvec.sort()
        #avail_dest = r.routingtable.keys().remove(r.get_address())
        avail_dest = r.routes_info.keys()
        dest = [random.choice(avail_dest) for x in tvec]
        value = [random.randrange(65536) for x in tvec]
        # generate a list of "test vectors": (<time>, <dest>, <value>)
        r.ipcore_ref.tbm.testvect = zip(tvec, dest, value)
        testvect[r.get_address()] = r.ipcore_ref.tbm.testvect
        # generate a list of expected values at every router destination
        # "check vectors" is: {dest : (<time from src>, <src>, <value>)}
        for i, d in enumerate(dest):
            if d in checkvect:
                checkvect[d].append(tuple([tvec[i], r.get_address(), value[i]]))
            else:
                checkvect[d] = [tuple([tvec[i], r.get_address(), value[i]])]
    # sort each checkvect by time
    for vect in checkvect.itervalues():
        vect.sort(key=lambda x: x[0])
    # put checkvects in each destination router
    for r in basicnoc.router_list():
        r.ipcore_ref.tbm.checkvect = checkvect[r.get_address()]
    with open("vectors.txt", "w") as f:
        f.write("testvect = %s\n" % repr(testvect))
        f.write("checkvect = %s\n" % repr(checkvect))
    #return (testvect, checkvect)
 
try:
    execfile("vectors.txt")
    for r in basicnoc.router_list():
        r.ipcore_ref.tbm.testvect = testvect[r.get_address()]
        r.ipcore_ref.tbm.checkvect = checkvect[r.get_address()]
    print "Using vectors.txt for test"
except:
    do_checkvect(basicnoc, sim_starttime, sim_sendtime, sim_meanperiod, sim_stddevperiod)
    print "Created new vectors for test"
 
# 3. Declare generators to put in the TBM simulation: 
# double generators that runs its own test vector and checks the expected results
 
# set ip_cores functionality as myhdl generators
def sourcechkgen(din, dout, tbm_ref):
    """
    This generator drives dout based on gen_data vectors
    and reacts to din to check with chk_data vectors.
    """
    # prepare data indexes for check vectors
    check_dict = {}
    for idx, val in enumerate(tbm_ref.checkvect):
        check_dict[val[2]] = idx
    received_idx = [False]*len(tbm_ref.checkvect)
 
    @myhdl.instance
    def datagen():
        datacount = 0
        protocol_ref = tbm_ref.ipcore_ref.get_protocol_ref()
        mysrc = tbm_ref.ipcore_ref.get_address()
        tbm_ref.debug("sourcechkgen.datagen: init dout is %s" % repr(dout.val))
        gen_data = tbm_ref.testvect
 
        for test_vector in gen_data:
            # test_vector : (<time>, <dest>, <value>)
            next_delay = test_vector[0] - myhdl.now()
            yield myhdl.delay(next_delay)
            dout.next = protocol_ref.newpacket(False, mysrc, test_vector[1], test_vector[2])
            tbm_ref.debug("sourcechkgen.datagen: sent test vector <%s>, dout is %s" % (repr(test_vector), repr(dout.val)))
        # wait for end of simulation
        tbm_ref.debug("sourcechkgen.datagen: test vectors exhausted. Going idle.")
        # TODO: check an easy way to go idle in MyHDL
        #next_delay = sim_maxtime - myhdl.now()
        #yield myhdl.delay(next_delay)
        #raise myhdl.StopSimulation("sourcechkgen.datagen: End of simulation")
        return
 
    @myhdl.instance
    def datacheck():
        protocol_ref = tbm_ref.ipcore_ref.get_protocol_ref()
        mydest = tbm_ref.ipcore_ref.get_address()
        check_data = tbm_ref.checkvect
        while True:
            # just check for data reception
            yield din
            # check packet
            inpacket = din.val
            # search checkvect by data payload
            chkidx = check_dict.get(inpacket["data"])
            if chkidx is None:
                tbm_ref.error("sourcechkgen.datacheck: unexpected packet : %s" % repr(inpacket))
            else:
                # check vectors : (<time from src>, <src>, <value>)
                expected_vector = check_data[chkidx]
                # received packet: report it
                received_idx[chkidx] = True
                # data was checked before. Inform about packet delay
                tbm_ref.debug("sourcechkgen.datacheck: (delay %d) packet received : <%s>, expected src=<%s>, value=<%s> " % (myhdl.now() - expected_vector[0], repr(inpacket), repr(expected_vector[1]), repr(expected_vector[2])))
                if inpacket["src"] != expected_vector[1]:
                    tbm_ref.error("sourcechkgen.datacheck: source address != %d (%d)" % (expected_vector[1], inpacket["src"]))
                if inpacket["dst"] != mydest:
                    tbm_ref.error("sourcechkgen.datacheck: destination address != %d (%d)" % (mydest, inpacket["dst"]))
 
    @myhdl.instance
    def finalcheck():
        while True:
            yield myhdl.delay(sim_maxtime - 1)
            # final check: missing packets
            tbm_ref.debug("sourcechkgen.finalcheck: check for missing packets.")
            if not all(received_idx):
                # missing packets:
                miss_count = len(received_idx) - sum(received_idx)
                tbm_ref.error("sourcechkgen.finalcheck: there are %d missing packets!" % miss_count)
                for idx, val in enumerate(tbm_ref.checkvect):
                    if not received_idx[idx]:
                        tbm_ref.debug("sourcechkgen.finalcheck: missing packet: <%s>" % repr(val))
 
 
    return (datagen, datacheck, finalcheck)
 
# 5. assign generators to ip cores (in TLM model !)
for r in basicnoc.router_list():
    r.ipcore_ref.tbm.register_generator(sourcechkgen)
 
# 6. configure simulation and run!
basicnoc.tbmsim.configure_simulation(max_time=sim_maxtime)
print "Starting simulation..."
runsecs = time.clock()
basicnoc.tbmsim.run()
runsecs = time.clock() - runsecs
print "Simulation finished in %f secs. Pick the results in log files." % runsecs
 
# 7. View graphical representation
 
draw_noc(basicnoc)
 

Compare with Previous | Blame | View Log

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.