1 from __future__
import print_function
3 # Copyright (C) 2016, Elphel.inc. 4 # Simulation code for cocotb simulation for x393 project 6 # This program is free software: you can redistribute it and/or modify 7 # it under the terms of the GNU General Public License as published by 8 # the Free Software Foundation, either version 3 of the License, or 9 # (at your option) any later version. 11 # This program is distributed in the hope that it will be useful, 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 # GNU General Public License for more details. 16 # You should have received a copy of the GNU General Public License 17 # along with this program. If not, see <http:#www.gnu.org/licenses/>. 18 @author: Andrey Filippov 19 @copyright: 2016 Elphel, Inc. 21 @contact: andrey@elphel.coml 28 from socket_command
import SocketCommand
30 from cocotb.triggers
import Timer
31 from x393interfaces
import MAXIGPMaster, PSBus, SAXIRdSim, SAXIWrSim
32 from cocotb.drivers
import BitDriver
33 from cocotb.triggers
import Timer, RisingEdge, ReadOnly
34 from cocotb.result
import ReturnValue, TestFailure, TestError, TestSuccess
39 if (max_items == 0)
or (len(lst) <= max_items):
45 fi = max_items-1
if max_items > 1
else max_items
53 class X393_cocotb_server(object):
54 INTR_ADDRESS = 0xfffffff0
55 INTM_ADDRESS = 0xfffffff4
56 RESERVED = (INTR_ADDRESS,INTM_ADDRESS)
57 writeIDMask = (1 <<12) -1
58 readIDMask = (1 <<12) -1
61 def __init__(self, dut, port, host, mempath=None, autoflush=True):
63 debug = os.getenv(
'COCOTB_DEBUG')
65 mempath = os.getenv(
'SIMULATION_PATH')+
"/"+
"memfile" 70 self.
cmd= SocketCommand()
77 self.dut._log.info (
"Created a new 'memory' file %s"%(mempath))
79 self._memfile.seek(self.
memhigh-1)
82 readOK = len(self._memfile.read(1))>0
83 self.dut._log.info (
"Read from 0x%08x"%(self.
memhigh-1))
88 self._memfile.seek(self.
memhigh-1)
89 self._memfile.write(chr(0))
91 self.dut._log.info(
"Wrote to 0x%08x to extend file to full size"%(self.
memhigh-1))
96 clock = dut.dutm0_aclk,
104 clock = dut.ps_sbus_clk)
109 clock = dut.axi_hclk,
116 clock = dut.axi_hclk,
125 clock = dut.axi_hclk,
134 clock = dut.saxi0_aclk,
141 level = logging.DEBUG
if debug
else logging.WARNING
142 self.dut._log.info(
'Set debug level '+str(level)+
", debug="+str(debug))
144 self.maxigp0.log.setLevel(level)
145 self.ps_sbus.log.setLevel(level)
146 self.saxihp0r.log.setLevel(level)
147 self.saxihp0w.log.setLevel(level)
148 self.saxihp1w.log.setLevel(level)
149 self.saxigp0.log.setLevel(level)
154 self.
socket_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
155 self.socket_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
157 self.socket_conn.bind((self.
HOST, self.
PORT))
158 self.dut._log.debug(
'Socket bind complete, HOST=%s, PORT=%d'%(self.
HOST,self.
PORT))
159 self.socket_conn.listen(1)
160 self.dut._log.info (
'Socket now listening to a single request on port %d: send command, receive response, close'%(self.
PORT))
161 except socket.error
as msg:
162 self.dut._log.info (
"Maybe you need to run 'killall vvp' to close previously opened socket?" )
163 self.
logErrorTerminate(
'Bind failed. Error Code : %s Message %s'%( str(msg[0]),msg[1]))
167 self.dut._log.error(msg)
168 cocotb.regression.tear_down()
169 raise TestFailure(msg)
175 self.soc_conn, soc_addr = self.socket_conn.accept()
176 self.dut._log.debug (
"Connected with %s"%(soc_addr[0] +
':' + str(soc_addr[1])))
178 line = self.soc_conn.recv(4096)
179 self.dut._log.debug(
"Received from socket: %s"%(line))
182 self.dut._log.debug(
"1.Received from socket: %s"%(line))
184 self.dut._log.debug(
"3.Received from socket: %s"%(line))
188 self.dut._log.debug(
"1.executeCommand: %s"%(line))
190 raise ReturnValue(
None)
191 self.dut._log.debug(
"2.executeCommand: %s"%(line))
192 self.cmd.fromJSON(line)
195 if self.cmd.getStart():
196 self.dut._log.info(
'Received START, waiting reset to be over')
199 while self.dut.reset_out.value.get_binstr() !=
"1":
201 while self.dut.reset_out.value:
208 self.soc_conn.send(self.cmd.toJSON(0)+
"\n")
209 self.dut._log.debug(
'Sent 0 to the socket')
212 elif self.cmd.getStop():
213 self.dut._log.info(
'Received STOP, closing...')
214 self.soc_conn.send(self.cmd.toJSON(0)+
"\n")
215 self.soc_conn.close()
218 self.socket_conn.shutdown(socket.SHUT_RDWR)
219 self.socket_conn.close()
220 cocotb.regression.tear_down()
222 raise TestSuccess(
'Terminating as received STOP command')
224 elif self.cmd.getWrite():
225 ad = self.cmd.getWrite()
226 self.dut._log.debug(
'Received WRITE, 0x%0x: %s'%(ad[0],
hex_list(ad[1])))
233 self._memfile.seek(addr)
235 sdata=struct.pack(
"<L",data)
236 self._memfile.write(sdata)
237 self.dut._log.debug(
"Written 'system memory': 0x%08x => 0x%08x"%(data,addr))
240 elif(ad[0] >= 0x40000000)
and (ad[0] < 0x80000000):
241 rslt =
yield self.maxigp0.axi_write(address = ad[0],
249 self.dut._log.debug(
'maxigp0.axi_write yielded %s'%(str(rslt)))
251 elif (ad[0] >= 0xc0000000)
and (ad[0] < 0xfffffffc):
252 self.ps_sbus.write_reg(ad[0],ad[1][0])
255 self.dut._log.info(
'Write address 0x%08x is outside of maxgp0, not yet supported'%(ad[0]))
257 self.dut._log.info(
'WRITE 0x%08x <= %s'%(ad[0],
hex_list(ad[1], max_items = 4)))
258 self.soc_conn.send(self.cmd.toJSON(rslt)+
"\n")
259 self.dut._log.debug(
'Sent rslt to the socket')
260 elif self.cmd.getRead():
261 ad = self.cmd.getRead()
262 self.dut._log.debug(str(ad))
263 if not isinstance(ad,(list,tuple)):
267 self.dut._log.debug(str(ad))
271 dval=[self.dut.irq_r.value.integer]
273 bv = self.dut.irq_r.value
274 bv.binstr = re.sub(
"[^1]",
"0",bv.binstr)
282 self._memfile.seek(addr)
283 self.dut._log.debug(
"read length="+str(len(self._memfile.read(4*ad[1]))))
285 self._memfile.seek(addr)
286 self.dut._log.debug(str(ad))
287 dval = list(struct.unpack(
"<"+
"L"*ad[1],self._memfile.read(4*ad[1])))
288 msg=
"'Written 'system memory: 0x%08x => "%(addr)
291 self.dut._log.debug(msg)
293 elif(ad[0] >= 0x40000000)
and (ad[0] < 0x80000000):
294 dval =
yield self.maxigp0.axi_read(address = ad[0],
300 self.dut._log.debug(
"axi_read returned 0x%08x => %s"%(ad[0],
hex_list(dval, max_items = 4)))
302 elif (ad[0]>= 0xc0000000)
and (ad[0] < 0xfffffffc):
303 dval =
yield self.ps_sbus.read_reg(ad[0])
305 self.dut._log.info(
'Read address 0x%08x is outside of maxgp0, not yet supported'%(ad[0]))
307 self.soc_conn.send(self.cmd.toJSON(dval)+
"\n")
308 self.dut._log.debug(
'Sent dval to the socket')
309 self.dut._log.info(
"READ 0x%08x =>%s"%(ad[0],
hex_list(dval, max_items = 4)))
310 elif self.cmd.getFlush():
311 self.dut._log.info(
'Received flush')
313 self.soc_conn.send(self.cmd.toJSON(0)+
"\n")
314 self.dut._log.debug(
'Sent 0 to the socket')
316 elif self.cmd.getWait():
318 int_dly = self.cmd.getWait()
320 num_clk= (int_dly[1] * self.
ACLK_FREQ) // 1000000000
321 self.dut._log.info(
'Received WAIT, interrupt mask = 0x%0x, timeout = %d ns, %d clocks'%(self.
int_mask,int_dly[1], num_clk))
323 for _
in range(num_clk):
324 yield RisingEdge(self.dut.dutm0_aclk)
326 irq_r=self.dut.irq_r.value.integer
328 bv = self.dut.irq_r.value
329 bv.binstr = re.sub(
"[^1]",
"0",bv.binstr)
334 self.soc_conn.send(self.cmd.toJSON(n)+
"\n")
335 self.dut._log.debug(
'Sent %d to the socket'%(n))
336 self.dut._log.info(
' WAIT over, passed %d ns'%((n * 1000000000)//self.
ACLK_FREQ))
338 self.dut._log.warning(
'Received unknown command: '+str(self.
cmd))
339 self.soc_conn.send(self.cmd.toJSON(1)+
"\n")
340 self.dut._log.debug(
'Sent 1 to the socket')
345 number = (number << 8) + ord(c)
350 tb = X393_cocotb_server(dut=dut, host =
"", port=7777)
351 dut._log.warn(
"Waiting for commnad on socket port %s"%(port))
354 rslt=
yield tb.receiveCommandFromSocket()
355 dut._log.debug(
"rslt = %s"%(str(rslt)))
356 except ReturnValue
as rv:
358 dut._log.info(
"rv = %s"%(str(rv)))
359 dut._log.info(
"line = %s"%(str(line)))
360 tb.socket_conn.close()
361 cocotb.regression.tear_down()
def run_test( dut, port=7777)
def receiveCommandFromSocket( self)
def __init__( self, dut, port, host, mempath=None, autoflush=True)
def logErrorTerminate( self, msg)
def executeCommand( self, line)
def hex_list( lst, max_items=0, frmt="0x%08x")