Package teamwork :: Package agent :: Module Agent
[hide private]
[frames] | no frames]

Source Code for Module teamwork.agent.Agent

  1  """Generic class for defining an agent and its capabilities 
  2  @author: David V. Pynadath <pynadath@isi.edu> 
  3  """ 
  4   
  5  from xml.dom.minidom import * 
  6  from teamwork.action.PsychActions import * 
  7  from teamwork.action.DecisionSpace import * 
  8  from teamwork.math.Keys import * 
  9  from teamwork.math.ProbabilityTree import ProbabilityTree 
 10   
11 -class Agent:
12 """Top-level agent class 13 @cvar actionClass: the Python class for this agent's option instances 14 @type actionClass: L{Action} subclass (default is L{action}) 15 @ivar name: the unique label for this agent 16 @type name: string 17 @ivar actions: the space of possible actions this agent can perform 18 @type actions: L{DecisionSpace} 19 @ivar omega: set of possible observations 20 """ 21 actionClass = Action 22
23 - def __init__(self,name='Generic Agent'):
24 """ 25 @param name: a label string 26 @type name: str 27 """ 28 self.setName(name) 29 # A_i 30 self.actions = DecisionSpace() 31 # Omega_i (defaults to null observation and perfect observation of actions) 32 self.omega = {None: True, True: True} 33 # O_i (defaults to perfect observability) 34 self.observations = [] 35 # Sigma_i 36 self.messages = [] 37 # B_i^0 38 self.beliefs = self.initialStateEstimator() 39 # Extra attributes 40 self.attributes = {}
41
42 - def setName(self,name):
43 """Sets the name of this agent 44 @param name: the unique ID for this agent 45 @type name: C{str} 46 """ 47 self.name = name
48
49 - def legalActions(self,state):
50 """Default method for specifying the set (i.e., list) of 51 actions that this agent can choose from in the given state""" 52 return self.actions[:]
53
54 - def legalMessages(self,state):
55 """Default method for specifying the set (i.e., list) of 56 messages that this agent can choose from in the given state""" 57 return self.messages[:]
58
59 - def generateAllObservations(self):
60 try: 61 return self.allObservations 62 except AttributeError: 63 self.allObservations = [{}] 64 for key in self.omega.keys(): 65 for omega in self.allObservations[:]: 66 self.allObservations.remove(omega) 67 for value in self.observations[key]: 68 newOmega = copy.copy(omega) 69 newOmega[key] = value 70 self.allObservations.append(newOmega) 71 return self.allObservations
72
73 - def generateHistories(self,length):
74 result = [[]] 75 for t in range(length): 76 # Generate possible observations 77 for history in result[:]: 78 result.remove(history) 79 for omega in self.generateAllObservations(): 80 newHistory = copy.copy(history) 81 newHistory.append({'type':'observation', 82 'value':omega}) 83 result.append(newHistory) 84 # Generate possible actions 85 for history in result[:]: 86 result.remove(history) 87 for action in self.actions: 88 newHistory = copy.copy(history) 89 newHistory.append({'type':'action', 90 'value':action}) 91 result.append(newHistory) 92 return result
93 94 # The state estimators. These default methods implement "perfect 95 # recall" 96 97 # SE^0_i
98 - def initialStateEstimator(self):
99 """Generates the initial belief state for the specified agent""" 100 return []
101 102 # SE_i\bullet\Sigma
103 - def preComStateEstimator(self,beliefs,obs,epoch=-1):
104 """Updates the agent's beliefs based on its observations""" 105 newBeliefs = beliefs[:] 106 newBeliefs.insert(0,obs) 107 if epoch >= 0: 108 newBeliefs[0]['_epoch'] = epoch 109 return newBeliefs
110 111 # SE_i\Sigma\bullet
112 - def postComStateEstimator(self,beliefs,msgs,epoch=-1):
113 """Updates the agent's beliefs based on received messages""" 114 newBeliefs = beliefs[:] 115 newBeliefs.insert(0,msgs) 116 if epoch >= 0: 117 newBeliefs[0]['_epoch'] = epoch 118 return newBeliefs
119
120 - def observable(self):
121 """ 122 @return: C{True} iff this agent has perfect observability of all actions 123 @rtype: bool 124 @warning: This is a superficial check that the observation function specifies C{True} for all action combinations. It will return C{False} if there are branches, non-C{True} leaf nodes, I{even} if the end result is equivalent to perfect observability 125 """ 126 if len(self.observations) == 0: 127 # Nothing is observable 128 return True 129 for entry in self.observations: 130 if len(entry['actions']) > 0: 131 # At least one action required 132 return False 133 elif not entry['actions'].only: 134 # No actions allowed 135 return False 136 elif not entry['tree'].isLeaf(): 137 # A branch in the function 138 return False 139 elif not entry['tree'].getValue() is True: 140 # Something other than perfect observability 141 return False 142 else: 143 # Our observation function passed all tests 144 return True
145
146 - def __cmp__(self,agent):
147 """Default comparison function...treats all agents equally""" 148 return 0
149
150 - def __copy__(self):
151 new = self.__class__(name=self.name) 152 new.messages = self.messages 153 new.actions = self.actions 154 new.omega.update(self.omega) 155 return new
156
157 - def __xml__(self):
158 doc = Document() 159 root = doc.createElement('agent') 160 doc.appendChild(root) 161 doc.documentElement.setAttribute('name',self.name) 162 # Observation symbols 163 element = doc.createElement('observations') 164 for key,value in self.omega.items(): 165 if isinstance(key,Key): 166 # Don't bother adding special observations (C{None} and C{True}) 167 element.appendChild(key.__xml__().documentElement) 168 # Observation function 169 for entry in self.observations: 170 node = doc.createElement('O') 171 for key,value in entry.items(): 172 if key == 'actions': 173 node.appendChild(value.__xml__().documentElement) 174 elif key == 'tree': 175 node.appendChild(value.__xml__().documentElement) 176 else: 177 node.setAttribute(key,str(value)) 178 element.appendChild(node) 179 root.appendChild(element) 180 # Miscellaneous attributes 181 for key,value in self.attributes.items(): 182 if key == 'image': 183 pass 184 elif key == 'coords': 185 coords = self.attributes['coords'] 186 doc.documentElement.setAttribute('x0',str(coords[0])) 187 doc.documentElement.setAttribute('y0',str(coords[1])) 188 doc.documentElement.setAttribute('x1',str(coords[2])) 189 doc.documentElement.setAttribute('y1',str(coords[3])) 190 elif isinstance(value,str): 191 doc.documentElement.setAttribute(key,value) 192 return doc
193
194 - def parse(self,element):
195 assert(element.tagName == 'agent') 196 self.setName(str(element.getAttribute('name'))) 197 arg = str(element.getAttribute('image')).strip() 198 if not arg: 199 arg = str(element.getAttribute('imageName')).strip() 200 if arg: 201 self.attributes['imageName'] = arg 202 arg = str(element.getAttribute('x0')).strip() 203 if arg: 204 coords = [float(arg), 205 float(element.getAttribute('y0')), 206 float(element.getAttribute('x1')), 207 float(element.getAttribute('y1')), 208 ] 209 self.attributes['coords'] = coords 210 # Parse observations 211 child = element.firstChild 212 while child: 213 if child.nodeType == child.ELEMENT_NODE: 214 if child.tagName == 'observations': 215 subchild = child.firstChild 216 while subchild: 217 if subchild.nodeType == child.ELEMENT_NODE: 218 if subchild.tagName == 'key': 219 key = Key() 220 key = key.parse(subchild) 221 self.omega[key] = True 222 elif subchild.tagName == 'O': 223 entry = {'actions': ActionCondition(), 224 'tree': ProbabilityTree()} 225 for key,value in subchild.attributes.items(): 226 entry[str(key)] = bool(value) 227 node = subchild.firstChild 228 while node: 229 if node.nodeType == node.ELEMENT_NODE and \ 230 node.tagName == 'condition': 231 entry['actions'].parse(node) 232 elif node.nodeType == node.ELEMENT_NODE and \ 233 node.tagName == 'tree': 234 entry['tree'].parse(node,str) 235 node = node.nextSibling 236 self.observations.append(entry) 237 subchild = subchild.nextSibling 238 child = child.nextSibling
239 240 if __name__ == '__main__': 241 import os.path 242 243 name = '/tmp/%s.xml' % (os.path.basename(__file__)) 244 file = open(name,'w') 245 file.write(agent.__xml__().toxml()) 246 file.close() 247 248 new = Agent() 249 new.parse(parse(name)) 250 print new 251