x393  1.0
FPGAcodeforElphelNC393camera
socket_command.py
Go to the documentation of this file.
1 from __future__ import print_function
2 """
3 # Copyright (C) 2016, Elphel.inc.
4 # Simulation code for cocotb simulation for x393 project
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http:#www.gnu.org/licenses/>.
18 @author: Andrey Filippov
19 @copyright: 2016 Elphel, Inc.
20 @license: GPLv3.0+
21 @contact: andrey@elphel.coml
22 """
23 import json
24 import socket
25 
26 class SocketCommand():
27  command=None
28  arguments=None
29  def __init__(self, command=None, arguments=None): # , debug=False):
30  self.command = command
31  self.arguments=arguments
32  def getCommand(self):
33  return self.command
34  def getArgs(self):
35  return self.arguments
36  def getStart(self):
37  return self.command == "start"
38  def getStop(self):
39  return self.command == "stop"
40  def getWrite(self):
41  return self.arguments if self.command == "write" else None
42  def getWait(self):
43  return self.arguments if self.command == "wait" else None
44  def getFlush(self):
45  return self.command == "flush"
46  def getRead(self):
47  return self.arguments if self.command == "read" else None
48  def setStart(self):
49  self.command = "start"
50  def setStop(self):
51  self.command = "stop"
52  def setWrite(self,arguments):
53  self.command = "write"
54  self.arguments=arguments
55  def setWait(self,arguments): # wait irq mask, timeout (ns)
56  self.command = "wait"
57  self.arguments=arguments
58  def setFlush(self): #flush memory file (use when sync_for_*
59  self.command = "flush"
60  def setRead(self,arguments):
61  self.command = "read"
62  self.arguments=arguments
63  def toJSON(self,val=None):
64  if val is None:
65  return json.dumps({"cmd":self.command,"args":self.arguments})
66  else:
67  return json.dumps(val)
68  def fromJSON(self,jstr):
69  d=json.loads(jstr)
70  try:
71  self.command=d['cmd']
72  except:
73  self.command=None
74  try:
75  self.arguments=d['args']
76  except:
77  self.arguments=None
78 
79 class x393Client():
80  def __init__(self, host='localhost', port=7777):
81  self.PORT = port
82  self.HOST = host # Symbolic name meaning all available interfaces
83  self.cmd= SocketCommand()
84  def communicate(self, snd_str):
85  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86  sock.connect((self.HOST, self.PORT))
87  sock.send(snd_str)
88  reply = sock.recv(16384) # limit reply to 16K
89  sock.close()
90  return reply
91  def start(self):
92  self.cmd.setStart()
93  print("start->",self.communicate(self.cmd.toJSON()))
94  def stop(self):
95  self.cmd.setStop()
96  print("stop->",self.communicate(self.cmd.toJSON()))
97  def write(self, address, data):
98  self.cmd.setWrite([address,data])
99  rslt = self.communicate(self.cmd.toJSON())
100 # print("write->",rslt)
101  def waitIrq(self, irqMask,wait_ns):
102  self.cmd.setWait([irqMask,wait_ns])
103  rslt = self.communicate(self.cmd.toJSON())
104 # print("waitIrq->",rslt)
105  def flush(self):
106  self.cmd.setFlush()
107 # print("flush->",self.communicate(self.cmd.toJSON()))
108 
109  def read(self, address):
110  self.cmd.setRead(address)
111 # print("read->args",self.cmd.getArgs())
112  rslt = self.communicate(self.cmd.toJSON())
113  #print("read->",rslt)
114  return json.loads(rslt)
115 
def write( self, address, data)
def setRead( self, arguments)
def toJSON( self, val=None)
def waitIrq( self, irqMask, wait_ns)
def read( self, address)
def setWrite( self, arguments)
def communicate( self, snd_str)
def setWait( self, arguments)
def __init__( self, host='localhost', port=7777)
def __init__( self, command=None, arguments=None)