1 from __future__
import print_function
5 # Copyright (C) 2016, Elphel.inc. 6 # Implementation of AXI4-based buses used in Elpphel x393 camera project 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <http:#www.gnu.org/licenses/>. 20 @author: Andrey Filippov 21 @copyright: 2016 Elphel, Inc. 23 @contact: andrey@elphel.coml 25 Uses code from https://github.com/potentialventures/cocotb/blob/master/cocotb/drivers/amba.py 26 Below are the copyright/license notices of the amba.py 27 ------------------------------------------------------ 29 Copyright (c) 2014 Potential Ventures Ltd 32 Redistribution and use in source and binary forms, with or without 33 modification, are permitted provided that the following conditions are met: 34 * Redistributions of source code must retain the above copyright 35 notice, this list of conditions and the following disclaimer. 36 * Redistributions in binary form must reproduce the above copyright 37 notice, this list of conditions and the following disclaimer in the 38 documentation and/or other materials provided with the distribution. 39 * Neither the name of Potential Ventures Ltd, 40 SolarFlare Communications Inc nor the 41 names of its contributors may be used to endorse or promote products 42 derived from this software without specific prior written permission. 44 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 45 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 46 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 47 DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY 48 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 49 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 50 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 51 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 52 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 53 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ''' 58 from cocotb.triggers
import Timer, RisingEdge, FallingEdge, Edge, ReadOnly, Lock
59 from cocotb.drivers
import BusDriver
60 from cocotb.result
import ReturnValue
61 from cocotb.binary
import BinaryValue
77 if not isinstance (signals,(list,tuple)):
79 for signal
in signals:
81 v.binstr =
"z" * len(signal)
86 class MAXIGPReadError(Exception):
90 class PSBus(BusDriver):
92 Small subset of Zynq registers, used to access SAXI_HP* registers 102 BusDriver.__init__(self, entity, name, clock)
104 self.bus.wr.setimmediatevalue(0)
105 self.bus.rd.setimmediatevalue(0)
111 yield self.busy_channel.acquire()
113 if not int(self.clock):
114 yield RisingEdge(self.clock)
115 self.bus.addr <= addr
118 yield RisingEdge(self.clock)
121 self.busy_channel.release()
125 yield self.busy_channel.acquire()
127 if not int(self.clock):
128 yield RisingEdge(self.clock)
129 self.bus.addr <= addr
131 yield RisingEdge(self.clock)
133 data = self.bus.dout.value.integer
135 bv = self.bus.dout.value
136 bv.binstr = re.sub(
"[^1]",
"0",bv.binstr)
140 self.busy_channel.release()
141 raise ReturnValue(data)
143 class SAXIWrSim(BusDriver):
145 Connects to host side of simul_axi_wr (just writes to system memory) (both GP and HP) 146 No locks are used, single instance should be connected to a particular port 163 def __init__(self, entity, name, clock, mempath, memhigh=0x40000000, data_bytes=8, autoflush=True, blatency=5):
165 @param entity Device under test 166 @param name port names prefix (DUT has I/O ports <name>_<signal> 167 @clock clock that drives this interface 168 @param mempath operation system path of the memory image (1GB now - 0..0x3fffffff) 169 @param memhigh memory high address 170 @param data_bytes data width, in bytes 171 @param autoflush flush file after each write 172 @param blatency number of cycles to delay write response (b) channel 175 BusDriver.__init__(self, entity, name, clock)
177 self.log.debug (
"SAXIWrSim: name='%s', mempath='%s', memhigh=0x%08x, data_bytes=%d, autoflush=%s, blatency=%d"%
178 (name,mempath,memhigh,data_bytes, str(autoflush), blatency))
185 self.log.info (
"SAXIWrSim(%s): created a new 'memory' file %s"%(name,mempath))
187 self._memfile.seek(memhigh-1)
190 readOK = len(self._memfile.read(1))>0
191 self.log.debug (
"Read from 0x%08x"%(memhigh-1))
196 self._memfile.seek(memhigh-1)
197 self._memfile.write(chr(0))
198 self._memfile.flush()
199 self.log.info(
"Wrote to 0x%08x to extend file to full size"%(memhigh-1))
202 self.bus.wr_ready.setimmediatevalue(1)
203 self.bus.bresp_latency.setimmediatevalue(blatency)
220 self.log.debug (
"SAXIWrSim(%s) init done"%(self.
name))
223 self._memfile.flush()
227 self.log.debug (
"SAXIWrSim(%s).saxi_wr_run"%(self.
name))
229 if not self.bus.wr_ready.value:
233 if self.bus.wr_valid.value:
235 yield RisingEdge(self.clock)
239 address = self.bus.wr_address.value.integer
241 self.log.warning (
"SAXIWrSim() tried to write to unknown memory address")
243 yield RisingEdge(self.clock)
246 self.log.warning (
"SAXIWrSim() Write memory address is not aligned to %d-byte words"%(self.
_data_bytes))
248 self._memfile.seek(address)
251 data = self.bus.wr_data.value.integer
253 self.log.warning (
"SAXIWrSim() writing undefined data")
254 bv = self.bus.wr_data.value
255 bv.binstr = re.sub(
"[^1]",
"0",bv.binstr)
257 sdata=struct.pack(self.
_fmt,data)
258 bv = self.bus.wr_data.value
259 bv.binstr= re.sub(
"[^0]",
"1",bv.binstr)
261 bv.binstr =
"1"+bv.binstr
263 self._memfile.write(sdata)
266 if bv.binstr[-1-i] != 0:
267 self._memfile.write(sdata[i])
269 self._memfile.seek(1,1)
271 self._memfile.flush()
272 self.log.debug (
"SAXIWrSim() 0x%x -> 0x%x, mask = 0x%x"%(address,data,bv.integer))
273 yield RisingEdge(self.clock)
276 class SAXIRdSim(BusDriver):
278 Connects to host side of simul_axi_rd (just writes to system memory) (both GP and HP) 279 No locks are used, single instance should be connected to a particular port 293 def __init__(self, entity, name, clock, mempath, memhigh=0x40000000, data_bytes=8):
295 @param entity Device under test 296 @param name port names prefix (DUT has I/O ports <name>_<signal> 297 @clock clock that drives this interface 298 @param mempath operation system path of the memory image (1GB now - 0..0x3fffffff) 299 @param memhigh memory high address 300 @param data_bytes data width, in bytes 304 BusDriver.__init__(self, entity, name, clock)
306 self.log.debug (
"SAXIRdSim: name='%s', mempath='%s', memhigh=0x%08x, data_bytes=%d"%
307 (name,mempath,memhigh,data_bytes))
314 self.log.info (
"SAXIRdSim(%s): created a new 'memory' file %s"%(name,mempath))
316 self._memfile.seek(memhigh-1)
319 readOK = len(self._memfile.read(1))>0
320 self.log.debug (
"Read from 0x%08x"%(memhigh-1))
325 self._memfile.seek(memhigh-1)
326 self._memfile.write(chr(0))
327 self._memfile.flush()
328 self.log.info(
"Wrote to 0x%08x to extend file to full size"%(memhigh-1))
330 self.bus.rd_valid.setimmediatevalue(0)
348 self.log.debug(
"SAXIRdSim(%s) init done"%(self.
name))
352 self.log.info (
"SAXIRdSim(%s).saxi_test"%(self.
name))
357 self.log.info (
"SAXIRdSim(%s).saxi_wr_run"%(self.
name))
362 yield FallingEdge(self.clock)
363 if self.bus.rd_ready.value:
365 self.bus.rd_valid <= 1
369 address = self.bus.rd_address.value.integer
371 self.log.warning (
"SAXIRdSim() tried to write to unknown memory address")
374 self.log.warning (
"SAXIRdSim() Write memory address is not aligned to %d-byte words"%(self.
_data_bytes))
376 self._memfile.seek(address)
381 self.log.warning (
"SAXIRdSim() failed reading %d bytes form 0x%08x"%(self.
_data_bytes, address))
385 data = struct.unpack(self.
_fmt,rs)
387 self.log.warning (
"SAXIRdSim():Can not unpack memory data @ address 0x%08x"%(address))
389 if (
not address
is None)
and (
not data
is None):
390 self.bus.rd_resp <= 0
391 self.bus.rd_data <= data
393 self.bus.rd_resp <= 2
396 self.bus.rd_valid <= 1
397 yield RisingEdge(self.clock)
398 self.bus.rd_valid <= 0
405 class MAXIGPMaster(BusDriver):
407 Implements subset of AXI4 used in x393 project for Xilinx Zynq MAXIGP* 457 _channels = [AR_CHN,AW_CHN,R_CHN,W_CHN,B_CHN]
458 def __init__(self, entity, name, clock, rdlag=None, blag=None):
459 BusDriver.__init__(self, entity, name, clock)
464 self.log.debug (
"MAXIGPMaster.__init__(): super done")
467 self.bus.rready.setimmediatevalue(1)
469 self.bus.xtra_rdlag.setimmediatevalue(rdlag)
471 self.bus.bready.setimmediatevalue(1)
473 self.bus.xtra_blag.setimmediatevalue(blag)
474 self.bus.awvalid.setimmediatevalue(0)
475 self.bus.wvalid.setimmediatevalue(0)
476 self.bus.arvalid.setimmediatevalue(0)
478 self.bus.arlock.setimmediatevalue(0)
479 self.bus.arcache.setimmediatevalue(0)
480 self.bus.arprot.setimmediatevalue(0)
481 self.bus.arqos.setimmediatevalue(0)
482 self.bus.awlock.setimmediatevalue(0)
483 self.bus.awcache.setimmediatevalue(0)
484 self.bus.awprot.setimmediatevalue(0)
485 self.bus.awqos.setimmediatevalue(0)
486 self.busy_channels = {}
487 self.log.debug (
"MAXIGPMaster.__init__(): pre-lock done")
490 for chn
in self._channels:
491 self.log.debug (
"MAXIGPMaster.__init__(): chn = %s"%(chn))
492 self.busy_channels[chn]=Lock(
"%s_%s_busy"%(name,chn))
495 def _send_write_address(self, address, delay, id, dlen, dsize, burst):
497 Send write address with parameters 498 @param address binary byte address for (first) burst start 499 @param delay Latency sending address in clock cycles 500 @param id transaction ID 501 @param dlen burst length (1..16) 502 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 503 @param burst burst type (0 - fixed, 1 - increment, 2 - wrap, 3 - reserved) 506 yield self.busy_channels[AW_CHN].acquire()
507 self.log.debug (
"MAXIGPMaster._send_write_address(): acquired lock")
508 for _
in range(delay):
509 yield RisingEdge(self.clock)
510 self.log.debug (
"MAXIGPMaster._send_write_address(): delay over")
511 self.bus.awvalid <= 1
513 self.bus.awsize <= dsize
514 self.bus.awburst <= burst
516 self.bus.awaddr <= address
517 address += 16*(1 << dsize)
522 if self.bus.awready.value:
524 yield RisingEdge(self.clock)
525 yield RisingEdge(self.clock)
526 self.bus.awaddr <= address
527 self.bus.awlen <= dlen -1
528 self.log.debug (
"1.MAXIGPMaster._send_write_address(), address=0x%08x, dlen = 0x%x"%(address, dlen))
531 if self.bus.awready.value:
533 yield RisingEdge(self.clock)
534 yield RisingEdge(self.clock)
535 self.bus.awvalid <= 0
537 _float_signals((self.bus.awaddr,self.bus.awid, self.bus.awlen, self.bus.awsize,self.bus.awburst))
538 self.busy_channels[AW_CHN].release()
539 self.log.debug (
"MAXIGPMaster._send_write_address(): released lock %s"%(AW_CHN))
542 def _send_read_address(self, address, delay, id, dlen, dsize, burst):
544 Send write address with parameters 545 @param address binary byte address for (first) burst start 546 @param delay Latency sending address in clock cycles 547 @param id transaction ID 548 @param dlen burst length (1..16) 549 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 550 @param burst burst type (0 - fixed, 1 - increment, 2 - wrap, 3 - reserved) 553 yield self.busy_channels[AR_CHN].acquire()
554 self.log.debug (
"MAXIGPMaster._send_write_address(): acquired lock")
555 for _
in range(delay):
556 yield RisingEdge(self.clock)
557 self.log.debug (
"MAXIGPMaster._send_read_address(): delay over")
558 self.bus.arvalid <= 1
560 self.bus.arsize <= dsize
561 self.bus.arburst <= burst
563 self.bus.araddr <= address
564 address += 16*(1 << dsize)
569 if self.bus.arready.value:
571 yield RisingEdge(self.clock)
572 yield RisingEdge(self.clock)
573 self.bus.araddr <= address
574 self.bus.arlen <= dlen -1
575 self.log.debug (
"1.MAXIGPMaster._send_read_address(), address=0x%08x, dlen = 0x%x"%(address, dlen))
578 if self.bus.arready.value:
580 yield RisingEdge(self.clock)
581 yield RisingEdge(self.clock)
582 self.bus.arvalid <= 0
584 _float_signals((self.bus.araddr,self.bus.arid, self.bus.arlen, self.bus.arsize,self.bus.arburst))
585 self.busy_channels[AR_CHN].release()
586 self.log.debug (
"MAXIGPMaster._send_read_address(): released lock %s"%(AR_CHN))
589 def _send_write_data(self, data, wrstb, delay, id, dsize):
591 Send a data word or a list of data words (supports multi-burst) 592 @param data a list/tuple of words to send 593 @param wrstb - write mask list (same size as data) 594 @param delay latency in clock cycles 595 @param id transaction ID 596 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 598 self.log.debug (
"MAXIGPMaster._send_write_data("+str(data)+
", "+str(wrstb)+
", "+str(delay)+
", "+str(id)+
", "+str(dsize))
599 yield self.busy_channels[W_CHN].acquire()
600 self.log.debug (
"MAXIGPMaster._send_write_data(): acquired lock")
601 for cycle
in range(delay):
602 yield RisingEdge(self.clock)
603 self.log.debug (
"MAXIGPMaster._send_write_data(): delay over")
606 for i,val_wstb
in enumerate(zip(data,wrstb)):
607 self.log.debug (
"MAXIGPMaster._send_write_data(), i= %d, val_stb=%s "%(i,str(val_wstb)))
608 if (i == (len(data) - 1))
or ((i % 16) == 15):
612 self.bus.wdata <= val_wstb[0]
613 self.bus.wstb <= val_wstb[1]
616 if self.bus.wready.value:
618 yield RisingEdge(self.clock)
619 yield RisingEdge(self.clock)
623 self.busy_channels[W_CHN].release()
624 self.log.debug (
"MAXIGPMaster._send_write_data(): released lock %s"%(W_CHN))
625 raise ReturnValue(dsize)
629 def _get_read_data(self, address, id, dlen, dsize, delay):
631 Send a data word or a list of data words (supports multi-burst) 632 @param address start address to read data from (just for logging) 633 @param id expected receive data ID 634 @param dlen number of words to read 635 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 636 @param delay latency in clock cycles 638 self.log.debug (
"MAXIGPMaster._get_read_data("+str(address)+
", "+str(id)+
", "+str(dlen)+
", "+str(dsize)+
", "+str(delay))
639 yield self.busy_channels[R_CHN].acquire()
640 self.log.debug (
"MAXIGPMaster._get_read_data(): acquired lock")
641 for cycle
in range(delay):
642 yield RisingEdge(self.clock)
643 self.log.debug (
"MAXIGPMaster._get_read_data(): delay over")
646 for i
in range(dlen):
647 self.log.debug (
"MAXIGPMaster._get_read_data(), i= %d"%(i))
650 if self.bus.rvalid.value:
652 data.append(self.bus.rdata.value.integer)
654 bv = self.bus.rdata.value
655 bv.binstr = re.sub(
"[^1]",
"0",bv.binstr)
656 data.append(bv.integer)
657 rid = int(self.bus.rid.value)
659 self.log.error(
"Read data 0x%x ID mismatch - expected: 0x%x, got 0x%x"%(address+i,id, rid))
661 yield RisingEdge(self.clock)
662 yield RisingEdge(self.clock)
666 self.busy_channels[R_CHN].release()
667 self.log.debug (
"MAXIGPMaster._get_read_data(): released lock %s"%(R_CHN))
668 raise ReturnValue(data)
673 def axi_write(self, address, value, byte_enable=None,
674 id=0, dsize=2, burst=1,address_latency=0,
676 self.log.debug(
"axi_write")
679 @param address binary byte address for burst start 680 @param value - a value or a list of values (supports multi-burst, but no interrupts between bursts) 681 @param byte_enable - byte enable mask. Should be None (all enabled) or have the same number of items as data 682 @param id transaction ID 683 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 684 @param burst burst type (0 - fixed, 1 - increment, 2 - wrap, 3 - reserved) 685 @param address_latency latency sending address in clock cycles 686 @param data_latency latency sending data in clock cycles 689 if not int(self.clock):
690 yield RisingEdge(self.clock)
694 if not isinstance(value, (list,tuple)):
696 if not isinstance(byte_enable, (list,tuple)):
697 if byte_enable
is None:
698 byte_enable = (1 << (1 << dsize)) - 1
699 byte_enable = [byte_enable]*len(value)
700 if None in byte_enable:
701 for i
in range(len(byte_enable)):
702 if byte_enable[i]
is None:
703 byte_enable[i] = (1 << (1 << dsize)) - 1
710 c_addr = cocotb.fork(self._send_write_address(address= address,
711 delay= address_latency,
717 c_data = cocotb.fork(self._send_write_data(data = value,
724 self.log.debug (
"c_addr.join()")
727 self.log.debug (
"c_data.join()")
730 self.log.debug (
"axi_write:All done")
734 def axi_read(self, address, id = 0, dlen = 1, dsize = 2, burst = 1, address_latency = 0, data_latency= 0 ):
736 Receive data form AXI port 737 @param address start address to read data from 738 @param id expected receive data ID 739 @param dlen number of words to read 740 @param dsize - data width - (1 << dsize) bytes (MAXIGP has only 2 bits while AXI specifies 3) 2 means 32 bits 741 @param burst burst type (0 - fixed, 1 - increment, 2 - wrap, 3 - reserved) 742 @param address_latency latency sending address in clock cycles 743 @param data_latency latency sending data in clock cycles 744 @return A list of BinaryValue objects 747 if not int(self.clock):
748 yield RisingEdge(self.clock)
750 c_addr = cocotb.fork(self._send_read_address(address= address,
751 delay= address_latency,
757 c_data = cocotb.fork(self._get_read_data (address= address,
761 delay = data_latency))
763 self.log.debug (
"c_addr.join()")
766 self.log.debug (
"c_data.join()")
767 data_rv=
yield c_data.join()
769 self.log.debug (
"axi_read:All done, returning, data_rv="+str(data_rv))
770 raise ReturnValue(data_rv)
def read_reg( self, addr)
def __init__( self, entity, name, clock, mempath, memhigh=0x40000000, data_bytes=8)
def __init__( self, entity, name, clock, mempath, memhigh=0x40000000, data_bytes=8, autoflush=True, blatency=5)
def __init__( self, entity, name, clock)
def write_reg( self, addr, data)
def _float_signals( signals)