Package teamwork :: Package communication :: Module SocketListener
[hide private]
[frames] | no frames]

Source Code for Module teamwork.communication.SocketListener

  1  import os 
  2  import re 
  3  import select 
  4  import socket 
  5  import string 
  6  import sys 
  7  import thread 
  8  import time 
  9  from types import * 
 10  import SocketServer 
 11  from xml.dom.minidom import parseString 
 12  from teamwork.communication.generic import * 
 13   
14 -def getANS():
15 """Returns host and port of ANS, as set in KQML_ANS environment 16 variable, or as in default setting if no environment variable set""" 17 host = 'vibhagam.isi.edu' 18 port = 5500 19 try: 20 exp = re.match("(.*):(.*)",os.environ['KQML_ANS']) 21 if exp: 22 host = exp.group(1) 23 prot = int(exp.group(2)) 24 except KeyError: 25 pass 26 return host,port
27 28 myClass = SocketServer.ThreadingTCPServer 29
30 -class KQMLListener(GenericCommunication,myClass):
31 port = 1200 32
33 - def __init__(self,args={}):
34 try: 35 port = args['port'] 36 except KeyError: 37 port = KQMLListener.port 38 self.connections = {} 39 self.queue = [] 40 self.handlers = [] 41 self.lock = thread.allocate_lock() 42 # Start server on first available port 43 while 1: 44 try: 45 myClass.__init__(self,('127.0.0.1',port),KQMLConn) 46 break 47 except socket.error: 48 port += 1 49 print 'Listening on',self.server_address
50
51 - def start(self):
52 thread.start_new_thread(self.serve_forever,())
53
54 - def addMsg(self,str,conn=None):
55 if conn: 56 name = conn.label() 57 else: 58 name = None 59 self.lock.acquire() 60 self.queue.insert(0,(name,str)) 61 self.lock.release() 62 print 'New msg:',str 63 for handler in self.handlers: 64 apply(handler,([(name,str)],))
65
66 - def registerHandler(self,fun):
67 self.handlers.append(fun)
68
69 - def receive(self):
70 self.lock.acquire() 71 msgs = self.queue[:] 72 self.queue = [] 73 self.lock.release() 74 return msgs
75
76 - def send(self,addr,str,retry=1):
77 try: 78 print 'Sending to:',addr 79 conn = self.connections[addr] 80 except KeyError: 81 for res in socket.getaddrinfo(addr[0], addr[1], 82 socket.AF_UNSPEC,socket.SOCK_STREAM): 83 af, socktype, proto, canonname, sa = res 84 try: 85 sock = socket.socket(af, socktype, proto) 86 except socket.error, msg: 87 sock = None 88 continue 89 try: 90 sock.connect(sa) 91 except socket.error, msg: 92 sock.close() 93 sock = None 94 continue 95 break 96 else: 97 print 'Unable to open socket' 98 return None 99 ## sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 100 ## sock.connect(addr) 101 self.process_request(sock,addr) 102 for count in range(10): 103 if self.connections.has_key(addr): 104 break 105 time.sleep(1) 106 else: 107 print 'Unable to connect:',addr 108 return None 109 conn = self.connections[addr] 110 try: 111 conn.sendMsg(str) 112 except ValueError: 113 del self.connections[addr] 114 if retry: 115 self.sendMsg(addr,str,None) 116 else: 117 print 'Unable to send message.' 118 return conn
119
120 - def stop(self):
121 self.server_close()
122
123 - def server_close(self):
124 for conn in self.connections.values(): 125 try: 126 conn.close() 127 except socket.error: 128 pass 129 myClass.server_close(self) 130 print 'Listener closed.'
131
132 -class KQMLConn(SocketServer.BaseRequestHandler):
133 - def handle(self):
134 self.name = None 135 print 'New connection:',self.client_address 136 self.server.connections[self.label()] = self 137 self.running = 1 138 while self.running: 139 time.sleep(1) 140 infd,outfd,errfd = select.select([self.request],[self.request], 141 [self.request]) 142 if self.request in infd: 143 try: 144 data = os.read(self.request.fileno(),16384) 145 except OSError: 146 data = None 147 if data: 148 self.server.addMsg(string.strip(data),self) 149 else: 150 self.close() 151 print 'Connection closed:',self.label()
152
153 - def sendMsg(self,str):
154 os.write(self.request.fileno(),str+'\n\r') 155 print 'Message sent'
156
157 - def label(self):
158 if not self.name: 159 self.name = self.request.getpeername() 160 return self.name
161
162 - def close(self):
163 del self.server.connections[self.label()] 164 self.running = None
165
166 -def printTree(doc):
167 str = '\n'+doc.nodeName 168 if doc.hasChildNodes(): 169 node = doc.firstChild 170 while node: 171 substr = printTree(node) 172 str += substr.replace('\n','\n\t') 173 node = node.nextSibling 174 return str
175
176 -def interactive(router):
177 router.registerHandler(lambda m: sys.stdout.write(`m`+'\n')) 178 router.start() 179 addr = None 180 while 1: 181 str = sys.stdin.readline() 182 str = string.strip(str) 183 if str == 'quit': 184 break 185 elif str == 'get': 186 msgs = router.receive() 187 print msgs 188 elif str == 'reply': 189 addr = msgs[0][0] 190 else: 191 if not addr: 192 pos = string.index(str,' ') 193 addr = ('127.0.0.1',int(str[:pos])) 194 str = str[pos+1:] 195 router.send(addr,str) 196 addr = None
197 198 if __name__ == "__main__" : 199 import time 200 201 args = {} 202 try: 203 args['port'] = int(sys.argv[1]) 204 except IndexError: 205 pass 206 router = KQMLListener(args) 207 208 try: 209 interactive(router) 210 except KeyboardInterrupt: 211 pass 212 router.server_close() 213