Server : Apache/2.4.18 (Ubuntu) System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64 User : oppastar ( 1041) PHP Version : 7.0.33-0ubuntu0.16.04.15 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, Directory : /usr/lib/python2.7/dist-packages/twisted/words/test/ |
Upload File : |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.words.protocols.jabber.client} """ from hashlib import sha1 from twisted.internet import defer from twisted.trial import unittest from twisted.words.protocols.jabber import client, error, jid, xmlstream from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer from twisted.words.xish import utility try: from twisted.internet import ssl except ImportError: ssl = None skipWhenNoSSL = "SSL not available" else: skipWhenNoSSL = None IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]' IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]' NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session' IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION class CheckVersionInitializerTests(unittest.TestCase): def setUp(self): a = xmlstream.Authenticator() xs = xmlstream.XmlStream(a) self.init = client.CheckVersionInitializer(xs) def testSupported(self): """ Test supported version number 1.0 """ self.init.xmlstream.version = (1, 0) self.init.initialize() def testNotSupported(self): """ Test unsupported version number 0.0, and check exception. """ self.init.xmlstream.version = (0, 0) exc = self.assertRaises(error.StreamError, self.init.initialize) self.assertEqual('unsupported-version', exc.condition) class InitiatingInitializerHarness(object): """ Testing harness for interacting with XML stream initializers. This sets up an L{utility.XmlPipe} to create a communication channel between the initializer and the stubbed receiving entity. It features a sink and source side that both act similarly to a real L{xmlstream.XmlStream}. The sink is augmented with an authenticator to which initializers can be added. The harness also provides some utility methods to work with event observers and deferreds. """ def setUp(self): self.output = [] self.pipe = utility.XmlPipe() self.xmlstream = self.pipe.sink self.authenticator = xmlstream.ConnectAuthenticator('example.org') self.xmlstream.authenticator = self.authenticator def waitFor(self, event, handler): """ Observe an output event, returning a deferred. The returned deferred will be fired when the given event has been observed on the source end of the L{XmlPipe} tied to the protocol under test. The handler is added as the first callback. @param event: The event to be observed. See L{utility.EventDispatcher.addOnetimeObserver}. @param handler: The handler to be called with the observed event object. @rtype: L{defer.Deferred}. """ d = defer.Deferred() d.addCallback(handler) self.pipe.source.addOnetimeObserver(event, d.callback) return d class IQAuthInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.IQAuthInitializer}. """ def setUp(self): super(IQAuthInitializerTests, self).setUp() self.init = client.IQAuthInitializer(self.xmlstream) self.authenticator.jid = jid.JID('user@example.com/resource') self.authenticator.password = 'secret' def testPlainText(self): """ Test plain-text authentication. Act as a server supporting plain-text authentication and expect the C{password} field to be filled with the password. Then act as if authentication succeeds. """ def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods. The response informs the client that plain-text authentication is supported. """ # Create server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('password') response.query.addElement('resource') # Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet) # Send server response self.pipe.source.send(response) return d def onAuthSet(iq): """ Called when the initializer sent the authentication request. The server checks the credentials and responds with an empty result signalling success. """ self.assertEqual('user', unicode(iq.query.username)) self.assertEqual('secret', unicode(iq.query.password)) self.assertEqual('resource', unicode(iq.query.resource)) # Send server response response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response) # Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) # Start the initializer d2 = self.init.initialize() return defer.gatherResults([d1, d2]) def testDigest(self): """ Test digest authentication. Act as a server supporting digest authentication and expect the C{digest} field to be filled with a sha1 digest of the concatenated stream session identifier and password. Then act as if authentication succeeds. """ def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods. The response informs the client that digest authentication is supported. """ # Create server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('digest') response.query.addElement('resource') # Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet) # Send server response self.pipe.source.send(response) return d def onAuthSet(iq): """ Called when the initializer sent the authentication request. The server checks the credentials and responds with an empty result signalling success. """ self.assertEqual('user', unicode(iq.query.username)) self.assertEqual(sha1('12345secret').hexdigest(), unicode(iq.query.digest).encode('utf-8')) self.assertEqual('resource', unicode(iq.query.resource)) # Send server response response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response) # Digest authentication relies on the stream session identifier. Set it. self.xmlstream.sid = u'12345' # Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) # Start the initializer d2 = self.init.initialize() return defer.gatherResults([d1, d2]) def testFailRequestFields(self): """ Test initializer failure of request for fields for authentication. """ def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods. The server responds that the client is not authorized to authenticate. """ response = error.StanzaError('not-authorized').toResponse(iq) self.pipe.source.send(response) # Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) # Start the initializer d2 = self.init.initialize() # The initialized should fail with a stanza error. self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2]) def testFailAuth(self): """ Test initializer failure to authenticate. """ def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods. The response informs the client that plain-text authentication is supported. """ # Send server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('password') response.query.addElement('resource') # Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet) # Send server response self.pipe.source.send(response) return d def onAuthSet(iq): """ Called when the initializer sent the authentication request. The server checks the credentials and responds with a not-authorized stanza error. """ response = error.StanzaError('not-authorized').toResponse(iq) self.pipe.source.send(response) # Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) # Start the initializer d2 = self.init.initialize() # The initializer should fail with a stanza error. self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2]) class BindInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.BindInitializer}. """ def setUp(self): super(BindInitializerTests, self).setUp() self.init = client.BindInitializer(self.xmlstream) self.authenticator.jid = jid.JID('user@example.com/resource') def testBasic(self): """ Set up a stream, and act as if resource binding succeeds. """ def onBind(iq): response = xmlstream.toResponse(iq, 'result') response.addElement((NS_BIND, 'bind')) response.bind.addElement('jid', content='user@example.com/other resource') self.pipe.source.send(response) def cb(result): self.assertEqual(jid.JID('user@example.com/other resource'), self.authenticator.jid) d1 = self.waitFor(IQ_BIND_SET, onBind) d2 = self.init.start() d2.addCallback(cb) return defer.gatherResults([d1, d2]) def testFailure(self): """ Set up a stream, and act as if resource binding fails. """ def onBind(iq): response = error.StanzaError('conflict').toResponse(iq) self.pipe.source.send(response) d1 = self.waitFor(IQ_BIND_SET, onBind) d2 = self.init.start() self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2]) class SessionInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.SessionInitializer}. """ def setUp(self): super(SessionInitializerTests, self).setUp() self.init = client.SessionInitializer(self.xmlstream) def testSuccess(self): """ Set up a stream, and act as if session establishment succeeds. """ def onSession(iq): response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response) d1 = self.waitFor(IQ_SESSION_SET, onSession) d2 = self.init.start() return defer.gatherResults([d1, d2]) def testFailure(self): """ Set up a stream, and act as if session establishment fails. """ def onSession(iq): response = error.StanzaError('forbidden').toResponse(iq) self.pipe.source.send(response) d1 = self.waitFor(IQ_SESSION_SET, onSession) d2 = self.init.start() self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2]) class BasicAuthenticatorTests(unittest.TestCase): """ Test for both BasicAuthenticator and basicClientFactory. """ def test_basic(self): """ Authenticator and stream are properly constructed by the factory. The L{xmlstream.XmlStream} protocol created by the factory has the new L{client.BasicAuthenticator} instance in its C{authenticator} attribute. It is set up with C{jid} and C{password} as passed to the factory, C{otherHost} taken from the client JID. The stream futher has two initializers, for TLS and authentication, of which the first has its C{required} attribute set to C{True}. """ self.client_jid = jid.JID('user@example.com/resource') # Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. xs = client.basicClientFactory(self.client_jid, 'secret').buildProtocol(None) # test authenticator's instance variables self.assertEqual('example.com', xs.authenticator.otherHost) self.assertEqual(self.client_jid, xs.authenticator.jid) self.assertEqual('secret', xs.authenticator.password) # test list of initializers tls, auth = xs.initializers self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer) self.assertIsInstance(auth, client.IQAuthInitializer) self.assertFalse(tls.required) class XMPPAuthenticatorTests(unittest.TestCase): """ Test for both XMPPAuthenticator and XMPPClientFactory. """ def test_basic(self): """ Test basic operations. Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let it produce a protocol instance. Then inspect the instance variables of the authenticator and XML stream objects. """ self.client_jid = jid.JID('user@example.com/resource') # Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. xs = client.XMPPClientFactory(self.client_jid, 'secret').buildProtocol(None) # test authenticator's instance variables self.assertEqual('example.com', xs.authenticator.otherHost) self.assertEqual(self.client_jid, xs.authenticator.jid) self.assertEqual('secret', xs.authenticator.password) # test list of initializers version, tls, sasl, bind, session = xs.initializers self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer)) self.assert_(isinstance(sasl, SASLInitiatingInitializer)) self.assert_(isinstance(bind, client.BindInitializer)) self.assert_(isinstance(session, client.SessionInitializer)) self.assertTrue(tls.required) self.assertTrue(sasl.required) self.assertTrue(bind.required) self.assertFalse(session.required) def test_tlsConfiguration(self): """ A TLS configuration is passed to the TLS initializer. """ configs = [] def init(self, xs, required=True, configurationForTLS=None): configs.append(configurationForTLS) self.client_jid = jid.JID('user@example.com/resource') # Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. configurationForTLS = ssl.CertificateOptions() factory = client.XMPPClientFactory( self.client_jid, 'secret', configurationForTLS=configurationForTLS) self.patch(xmlstream.TLSInitiatingInitializer, "__init__", init) xs = factory.buildProtocol(None) # test list of initializers version, tls, sasl, bind, session = xs.initializers self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer) self.assertIs(configurationForTLS, configs[0]) test_tlsConfiguration.skip = skipWhenNoSSL