Package tests :: Module axis1
[hide private]
[frames] | no frames]

Source Code for Module tests.axis1

  1  # This program is free software; you can redistribute it and/or modify 
  2  # it under the terms of the (LGPL) GNU Lesser General Public License as 
  3  # published by the Free Software Foundation; either version 3 of the  
  4  # License, or (at your option) any later version. 
  5  # 
  6  # This program is distributed in the hope that it will be useful, 
  7  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  8  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  9  # GNU Library Lesser General Public License for more details at 
 10  # ( http://www.gnu.org/licenses/lgpl.html ). 
 11  # 
 12  # You should have received a copy of the GNU Lesser General Public License 
 13  # along with this program; if not, write to the Free Software 
 14  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 
 15  # written by: Jeff Ortel ( jortel@redhat.com ) 
 16   
 17  # 
 18  # This test requires installation or visability to my local axis(1) server. 
 19  # 
 20   
 21  import sys 
 22  sys.path.append('../') 
 23   
 24  import logging 
 25  import traceback as tb 
 26  import suds.metrics as metrics 
 27  from tests import * 
 28  from suds import WebFault 
 29  from suds.client import Client 
 30  from suds.sudsobject import Object 
 31  from suds.transport.https import HttpAuthenticated 
 32  from suds.plugin import * 
 33   
 34  errors = 0 
 35   
 36  credentials = dict(username='jortel', password='abc123') 
 37   
 38  setup_logging() 
 39   
 40   
41 -class MyInitPlugin(InitPlugin):
42
43 - def initialized(self, context):
44 print 'PLUGIN (init): initialized: ctx=%s' % context.__dict__
45 46
47 -class MyDocumentPlugin(DocumentPlugin):
48
49 - def loaded(self, context):
50 print 'PLUGIN (document): loaded: ctx=%s' % context.__dict__
51
52 - def parsed(self, context):
53 print 'PLUGIN (document): parsed: ctx=%s' % context.__dict__
54 55
56 -class MyMessagePlugin(MessagePlugin):
57
58 - def marshalled(self, context):
59 print 'PLUGIN (message): marshalled: ctx=%s' % context.__dict__
60
61 - def sending(self, context):
62 print 'PLUGIN (message): sending: ctx=%s' % context.__dict__
63
64 - def received(self, context):
65 print 'PLUGIN (message): received: ctx=%s' % context.__dict__
66
67 - def parsed(self, context):
68 print 'PLUGIN (message): parsed: ctx=%s' % context.__dict__
69
70 - def unmarshalled(self, context):
71 print 'PLUGIN: (massage): unmarshalled: ctx=%s' % context.__dict__
72 73 74 myplugins = ( 75 MyInitPlugin(), 76 MyDocumentPlugin(), 77 MyMessagePlugin(),) 78 79 80 #logging.getLogger('suds.client').setLevel(logging.DEBUG) 81
82 -def start(url):
83 global errors 84 print '\n________________________________________________________________\n' 85 print 'Test @ ( %s )\nerrors = %d\n' % (url, errors)
86 87 try: 88 url = 'http://localhost:8081/axis/services/basic-rpc-encoded?wsdl' 89 start(url) 90 t = HttpAuthenticated(**credentials) 91 client = Client(url, transport=t, cache=None, plugins=myplugins) 92 print client 93 # 94 # create a name object using the wsdl 95 # 96 print 'create name' 97 name = client.factory.create('ns0:Name') 98 name.first = u'jeff'+unichr(1234) 99 name.last = 'ortel' 100 print name 101 # 102 # create a phone object using the wsdl 103 # 104 print 'create phone' 105 phoneA = client.factory.create('ns0:Phone') 106 phoneA.npa = 410 107 phoneA.nxx = 555 108 phoneA.number = 5138 109 phoneB = client.factory.create('ns0:Phone') 110 phoneB.npa = 919 111 phoneB.nxx = 555 112 phoneB.number = 4406 113 phoneC = { 114 'npa':205, 115 'nxx':777, 116 'number':1212 117 } 118 # 119 # create a dog 120 # 121 dog = client.factory.create('ns0:Dog') 122 dog.name = 'Chance' 123 dog.trained = True 124 # 125 # create a person object using the wsdl 126 # 127 person = client.factory.create('ns0:Person') 128 print '{empty} person=\n%s' % person 129 person.name = name 130 person.age = 43 131 person.phone = [phoneA,phoneB,phoneC] 132 person.pets = [dog] 133 print 'person=\n%s' % person 134 # 135 # add the person (using the webservice) 136 # 137 print 'addPersion()' 138 result = client.service.addPerson(person) 139 print '\nreply(\n%s\n)\n' % str(result) 140 # 141 # create a new name object used to update the person 142 # 143 newname = client.factory.create('ns0:Name') 144 newname.first = 'Todd' 145 newname.last = None 146 # 147 # create AnotherPerson using Person 148 # 149 ap = client.factory.create('ns0:AnotherPerson') 150 ap.name = person.name 151 ap.age = person.age 152 ap.phone = person.phone 153 ap.pets = person.pets 154 print 'AnotherPerson\n%s' % ap 155 # 156 # update the person's name (using the webservice) 157 # 158 print 'updatePersion()' 159 result = client.service.updatePerson(ap, newname) 160 print '\nreply(\n%s\n)\n' % str(result) 161 result = client.service.updatePerson(ap, None) 162 print '\nreply(\n%s\n)\n' % str(result) 163 except WebFault, f: 164 errors += 1 165 print f 166 print f.fault 167 except Exception, e: 168 errors += 1 169 print e 170 tb.print_exc() 171 172 try: 173 url = 'http://localhost:8081/axis/services/basic-rpc-encoded?wsdl' 174 start(url) 175 t = HttpAuthenticated(**credentials) 176 client = Client(url, transport=t, cache=None) 177 print client 178 # 179 # create a name object as dict 180 # 181 print 'create name' 182 name = {} 183 name['first'] = 'Elmer' 184 name['last'] = 'Fudd' 185 print name 186 # 187 # create a phone as dict 188 # 189 print 'create phone' 190 phoneA = {} 191 phoneA['npa'] = 410 192 phoneA['nxx'] = 555 193 phoneA['number'] = 5138 194 phoneB = {} 195 phoneB['npa'] = 919 196 phoneB['nxx'] = 555 197 phoneB['number'] = 4406 198 phoneC = { 199 'npa':205, 200 'nxx':777, 201 'number':1212 202 } 203 # 204 # create a dog 205 # 206 dog = { 207 'name':'Chance', 208 'trained':True, 209 } 210 # 211 # create a person as dict 212 # 213 person = {} 214 print '{empty} person=\n%s' % person 215 person['name'] = name 216 person['age'] = 43 217 person['phone'] = [phoneA,phoneB, phoneC] 218 person['pets'] = [dog] 219 print 'person=\n%s' % person 220 # 221 # add the person (using the webservice) 222 # 223 print 'addPersion()' 224 result = client.service.addPerson(person) 225 print '\nreply(\n%s\n)\n' % str(result) 226 except WebFault, f: 227 errors += 1 228 print f 229 print f.fault 230 except Exception, e: 231 errors += 1 232 print e 233 tb.print_exc() 234 235 try: 236 print "echo(' this is cool ')" 237 result = client.service.echo('this is cool') 238 print '\nreply( "%s" )\n' % str(result) 239 print 'echo(None)' 240 result = client.service.echo(None) 241 print '\nreply( "%s" )\n' % str(result) 242 except WebFault, f: 243 errors += 1 244 print f 245 print f.fault 246 except Exception, e: 247 errors += 1 248 print e 249 tb.print_exc() 250 251 try: 252 print 'hello()' 253 result = client.service.hello() 254 print '\nreply( %s )\n' % str(result) 255 except WebFault, f: 256 errors += 1 257 print f 258 print f.fault 259 except Exception, e: 260 errors += 1 261 print e 262 tb.print_exc() 263 264 try: 265 print 'testVoid()' 266 result = client.service.getVoid() 267 print '\nreply( %s )\n' % str(result) 268 except WebFault, f: 269 errors += 1 270 print f 271 print f.fault 272 except Exception, e: 273 errors += 1 274 print e 275 tb.print_exc() 276 277 try: 278 print '** new style arrays **' 279 words = ['my', 'dog', 'likes', 'steak'] 280 result = client.service.printList(words) 281 print '\nreply( %s )\n' % str(result) 282 283 print '** old style arrays **' 284 array = client.factory.create('ArrayOf_xsd_string') 285 print 'ArrayOf_xsd_string=\n%s' % array 286 array.item = ['my', 'dog', 'likes', 'steak'] 287 result = client.service.printList(array) 288 print '\nreply( %s )\n' % str(result) 289 except WebFault, f: 290 errors += 1 291 print f 292 print f.fault 293 except Exception, e: 294 errors += 1 295 print e 296 tb.print_exc() 297 298 try: 299 s = 'hello' 300 for n in range(0, 3): 301 print 'getList(%s, %d)' % (s, n) 302 result = client.service.getList(s, n) 303 print '\nreply( %s )\n' % str(result) 304 assert ( isinstance(result, list) and len(result) == n ) 305 except WebFault, f: 306 errors += 1 307 print f 308 print f.fault 309 except Exception, e: 310 errors += 1 311 print e 312 tb.print_exc() 313 314 try: 315 print 'testExceptions()' 316 result = client.service.throwException() 317 print '\nreply( %s )\n' % tostr(result) 318 raise Exception('Fault expected and not raised') 319 except WebFault, f: 320 print f 321 print f.fault 322 except Exception, e: 323 errors += 1 324 print e 325 tb.print_exc() 326 327 try: 328 url = 'http://localhost:8081/axis/services/basic-rpc-encoded?wsdl' 329 start(url) 330 client = Client(url, faults=False, **credentials) 331 print 'testExceptions()' 332 result = client.service.throwException() 333 print '\nreply( %s )\n' % str(result) 334 except WebFault, f: 335 errors += 1 336 print f 337 print f.fault 338 except Exception, e: 339 errors += 1 340 print e 341 tb.print_exc() 342 343 print '\nFinished: errors=%d' % errors 344