Package teamwork :: Package communication :: Module KQML
[hide private]
[frames] | no frames]

Source Code for Module teamwork.communication.KQML

  1  import socket 
  2  import SocketServer 
  3  import string 
  4  import thread 
  5  from threading import * 
  6  from types import * 
  7  from UserDict import UserDict 
  8   
  9  from teamwork.communication.generic import * 
 10   
11 -class KQMLListener(GenericCommunication,SocketServer.ThreadingTCPServer):
12 13 LABEL_REGISTER = 'register' 14
15 - def __init__(self,args):
16 try: 17 self.port = int(args['port']) 18 port = self.port 19 except KeyError: 20 self.port = None 21 port = 13579 22 ok = None 23 while not ok: 24 try: 25 TCPServer.__init__(self,('localhost',port), 26 KQMLRequestHandler) 27 ok = 1 28 except socket.error,e: 29 if self.port: 30 raise socket.error,e 31 else: 32 port = port + 1 33 self.port = port 34 self.host = socket.gethostname() 35 print 'Successful socket bind:',self.socket.getsockname() 36 self.lock = Lock() 37 self.queue = [] 38 self.connections = {} 39 # If ANS info, then register 40 if args.has_key('ANS'): 41 addr = args['ANS'] 42 thread.start_new_thread(self.connect,('ANS',addr))
43
44 - def register(self,name):
45 try: 46 handler = self.connections['ANS'] 47 except KeyError: 48 return None 49 handler.send('(register :content ('+name+' '+\ 50 self.host+' ' +self.port+') :sender '+name+\ 51 ' :reply-with '+self.LABEL_REGISTER+')')
52
53 - def connect(self,name,addr):
54 sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 55 sock.connect(addr) 56 handler = KQMLRequestHandler(sock,addr,self,name)
57
58 - def start(self):
59 self.listener = thread.start_new_thread(self.serve_forever,())
60
61 - def send(self,msg):
62 if msg.has_key('address'): 63 # Message has (host,port) in it 64 addr = msg['address'] 65 del msg['address'] 66 else: 67 # Need to look up (host,port) 68 addr = None 69 if addr: 70 thread.start_new_thread(self.connect,(msg['receiver'],addr)) 71 ready = None 72 while not ready: 73 time.sleep(1) 74 self.lock.acquire() 75 ready = self.connections.has_key(msg['receiver']) 76 self.lock.release() 77 self.connections[msg['receiver']].send(`msg`)
78
79 - def addMsg(self,msg):
80 self.lock.acquire() 81 self.queue.insert(0,msg) 82 self.lock.release()
83
84 - def receive(self):
85 msgs = [] 86 self.lock.acquire() 87 while len(self.queue) > 0: 88 msgs.append(self.queue.pop()) 89 self.lock.release() 90 return msgs
91
92 - def addConnection(self,name,handler):
93 self.lock.acquire() 94 self.connections[name] = handler 95 self.lock.release()
96
97 -class KQMLRequestHandler(SocketServer.StreamRequestHandler):
98 - def __init__(self, request, client_address, server,name=None):
99 self.name = name 100 if self.name: 101 server.addConnection(self.name,self) 102 SocketServer.BaseRequestHandler.__init__(self, request, 103 client_address, server)
104
105 - def handle(self):
106 input = self.rfile.readline() 107 while input: 108 msg = KQMLMessage(string.strip(input)) 109 if not self.name and msg.has_key('sender'): 110 self.name = msg['sender'] 111 self.server.addConnection(self.name,self) 112 self.server.addMsg(msg) 113 input = self.rfile.readline()
114
115 - def send(self,str):
116 if not type(str) is StringType: 117 str = `str` 118 self.wfile.write(str+'\r\n')
119
120 -class KQMLMessage(GenericMessage,UserDict):
121 - def __init__(self,initialdata=None):
122 if initialdata: 123 if type(initialdata) is StringType: 124 UserDict.__init__(self,self.parse(initialdata)) 125 else: 126 UserDict.__init__(self,initialdata) 127 else: 128 UserDict.__init__(self)
129
130 - def parse(self,str):
131 # Transforms a string representation of a KQML message into a 132 # structured dictionary 133 msg = {} 134 if str[0] in ['"','(']: 135 str = str[1:len(str)-1] 136 elements = string.split(string.strip(str)) 137 depth = 0 138 key = None 139 content = '' 140 for element in elements: 141 if depth > 0: 142 # Processing multi-word value 143 last = len(element) - 1 144 if (delimiter == '"' and element[last] == '"') or \ 145 delimiter == '(' and element[last] == ')': 146 # Finished with multi-word value 147 content = content + ' ' + element[:last] 148 depth = depth - 1 149 else: 150 # Continuing on with multi-word value 151 content = content + ' ' + element 152 elif key: 153 if element[0] in ['"','(']: 154 # Beginning multi-word value 155 content = element[1:] 156 depth = depth + 1 157 delimiter = element[0] 158 else: 159 # Single-word value 160 content = element 161 elif element[0] == ':': 162 # We have a new key-value pair 163 key = element[1:] 164 else: 165 # Performative appears without a key 166 key = 'performative' 167 content = element 168 169 if depth == 0 and key and len(content) > 0: 170 # Completed one key-value pair 171 msg[key] = content 172 key = None 173 content = '' 174 return msg
175
176 - def __repr__(self):
177 str = '' 178 str = '('+self['performative'] 179 for key,value in self.items(): 180 if key != 'performative': 181 str = str + ' :' + key + ' ' 182 if ' ' in value: 183 str = str + '(' + value + ')' 184 else: 185 str = str + value 186 str = str + ')'
187 188 if __name__ == '__main__': 189 import sys 190 import time 191 192 args = {} 193 try: 194 args['port'] = int(sys.argv[1]) 195 except IndexError: 196 pass 197 198 me = KQMLListener(args) 199 me.start() 200 done = None 201 while not done: 202 try: 203 time.sleep(1) 204 print me.receive() 205 except KeyboardInterrupt: 206 done = 1 207 me.stop() 208