1 import os
2 import re
3 import select
4 import socket
5 import string
6 import sys
7 import thread
8 import time
9 from types import *
10 import SocketServer
11 from xml.dom.minidom import parseString
12 from teamwork.communication.generic import *
13
15 """Returns host and port of ANS, as set in KQML_ANS environment
16 variable, or as in default setting if no environment variable set"""
17 host = 'vibhagam.isi.edu'
18 port = 5500
19 try:
20 exp = re.match("(.*):(.*)",os.environ['KQML_ANS'])
21 if exp:
22 host = exp.group(1)
23 prot = int(exp.group(2))
24 except KeyError:
25 pass
26 return host,port
27
28 myClass = SocketServer.ThreadingTCPServer
29
31 port = 1200
32
34 try:
35 port = args['port']
36 except KeyError:
37 port = KQMLListener.port
38 self.connections = {}
39 self.queue = []
40 self.handlers = []
41 self.lock = thread.allocate_lock()
42
43 while 1:
44 try:
45 myClass.__init__(self,('127.0.0.1',port),KQMLConn)
46 break
47 except socket.error:
48 port += 1
49 print 'Listening on',self.server_address
50
52 thread.start_new_thread(self.serve_forever,())
53
54 - def addMsg(self,str,conn=None):
55 if conn:
56 name = conn.label()
57 else:
58 name = None
59 self.lock.acquire()
60 self.queue.insert(0,(name,str))
61 self.lock.release()
62 print 'New msg:',str
63 for handler in self.handlers:
64 apply(handler,([(name,str)],))
65
68
70 self.lock.acquire()
71 msgs = self.queue[:]
72 self.queue = []
73 self.lock.release()
74 return msgs
75
76 - def send(self,addr,str,retry=1):
77 try:
78 print 'Sending to:',addr
79 conn = self.connections[addr]
80 except KeyError:
81 for res in socket.getaddrinfo(addr[0], addr[1],
82 socket.AF_UNSPEC,socket.SOCK_STREAM):
83 af, socktype, proto, canonname, sa = res
84 try:
85 sock = socket.socket(af, socktype, proto)
86 except socket.error, msg:
87 sock = None
88 continue
89 try:
90 sock.connect(sa)
91 except socket.error, msg:
92 sock.close()
93 sock = None
94 continue
95 break
96 else:
97 print 'Unable to open socket'
98 return None
99
100
101 self.process_request(sock,addr)
102 for count in range(10):
103 if self.connections.has_key(addr):
104 break
105 time.sleep(1)
106 else:
107 print 'Unable to connect:',addr
108 return None
109 conn = self.connections[addr]
110 try:
111 conn.sendMsg(str)
112 except ValueError:
113 del self.connections[addr]
114 if retry:
115 self.sendMsg(addr,str,None)
116 else:
117 print 'Unable to send message.'
118 return conn
119
122
124 for conn in self.connections.values():
125 try:
126 conn.close()
127 except socket.error:
128 pass
129 myClass.server_close(self)
130 print 'Listener closed.'
131
132 -class KQMLConn(SocketServer.BaseRequestHandler):
134 self.name = None
135 print 'New connection:',self.client_address
136 self.server.connections[self.label()] = self
137 self.running = 1
138 while self.running:
139 time.sleep(1)
140 infd,outfd,errfd = select.select([self.request],[self.request],
141 [self.request])
142 if self.request in infd:
143 try:
144 data = os.read(self.request.fileno(),16384)
145 except OSError:
146 data = None
147 if data:
148 self.server.addMsg(string.strip(data),self)
149 else:
150 self.close()
151 print 'Connection closed:',self.label()
152
154 os.write(self.request.fileno(),str+'\n\r')
155 print 'Message sent'
156
158 if not self.name:
159 self.name = self.request.getpeername()
160 return self.name
161
163 del self.server.connections[self.label()]
164 self.running = None
165
167 str = '\n'+doc.nodeName
168 if doc.hasChildNodes():
169 node = doc.firstChild
170 while node:
171 substr = printTree(node)
172 str += substr.replace('\n','\n\t')
173 node = node.nextSibling
174 return str
175
177 router.registerHandler(lambda m: sys.stdout.write(`m`+'\n'))
178 router.start()
179 addr = None
180 while 1:
181 str = sys.stdin.readline()
182 str = string.strip(str)
183 if str == 'quit':
184 break
185 elif str == 'get':
186 msgs = router.receive()
187 print msgs
188 elif str == 'reply':
189 addr = msgs[0][0]
190 else:
191 if not addr:
192 pos = string.index(str,' ')
193 addr = ('127.0.0.1',int(str[:pos]))
194 str = str[pos+1:]
195 router.send(addr,str)
196 addr = None
197
198 if __name__ == "__main__" :
199 import time
200
201 args = {}
202 try:
203 args['port'] = int(sys.argv[1])
204 except IndexError:
205 pass
206 router = KQMLListener(args)
207
208 try:
209 interactive(router)
210 except KeyboardInterrupt:
211 pass
212 router.server_close()
213