https://t.me/RX1948
Server : Apache/2.4.18 (Ubuntu)
System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64
User : oppastar ( 1041)
PHP Version : 7.0.33-0ubuntu0.16.04.15
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
Directory :  /usr/lib/python2.7/dist-packages/twisted/web/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/python2.7/dist-packages/twisted/web/test/test_soap.py
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
#

"""Test SOAP support."""

try:
    import SOAPpy
except ImportError:
    SOAPpy = None
    class SOAPPublisher: pass
else:
    from twisted.web import soap
    SOAPPublisher = soap.SOAPPublisher

from twisted.trial import unittest
from twisted.web import server, error
from twisted.internet import reactor, defer


class Test(SOAPPublisher):

    def soap_add(self, a, b):
        return a + b

    def soap_kwargs(self, a=1, b=2):
        return a + b
    soap_kwargs.useKeywords=True

    def soap_triple(self, string, num):
        return [string, num, None]

    def soap_struct(self):
        return SOAPpy.structType({"a": "c"})

    def soap_defer(self, x):
        return defer.succeed(x)

    def soap_deferFail(self):
        return defer.fail(ValueError())

    def soap_fail(self):
        raise RuntimeError

    def soap_deferFault(self):
        return defer.fail(ValueError())

    def soap_complex(self):
        return {"a": ["b", "c", 12, []], "D": "foo"}

    def soap_dict(self, map, key):
        return map[key]


class SOAPTests(unittest.TestCase):

    def setUp(self):
        self.publisher = Test()
        self.p = reactor.listenTCP(0, server.Site(self.publisher),
                                   interface="127.0.0.1")
        self.port = self.p.getHost().port

    def tearDown(self):
        return self.p.stopListening()

    def proxy(self):
        return soap.Proxy("http://127.0.0.1:%d/" % self.port)

    def testResults(self):
        inputOutput = [
            ("add", (2, 3), 5),
            ("defer", ("a",), "a"),
            ("dict", ({"a": 1}, "a"), 1),
            ("triple", ("a", 1), ["a", 1, None])]

        dl = []
        for meth, args, outp in inputOutput:
            d = self.proxy().callRemote(meth, *args)
            d.addCallback(self.assertEqual, outp)
            dl.append(d)

        # SOAPpy kinda blows.
        d = self.proxy().callRemote('complex')
        d.addCallback(lambda result: result._asdict())
        d.addCallback(self.assertEqual, {"a": ["b", "c", 12, []], "D": "foo"})
        dl.append(d)

        # We now return to our regularly scheduled program, already in progress.
        return defer.DeferredList(dl, fireOnOneErrback=True)

    def testMethodNotFound(self):
        """
        Check that a non existing method return error 500.
        """
        d = self.proxy().callRemote('doesntexist')
        self.assertFailure(d, error.Error)
        def cb(err):
            self.assertEqual(int(err.status), 500)
        d.addCallback(cb)
        return d

    def testLookupFunction(self):
        """
        Test lookupFunction method on publisher, to see available remote
        methods.
        """
        self.assertTrue(self.publisher.lookupFunction("add"))
        self.assertTrue(self.publisher.lookupFunction("fail"))
        self.assertFalse(self.publisher.lookupFunction("foobar"))

if not SOAPpy:
    SOAPTests.skip = "SOAPpy not installed"


https://t.me/RX1948 - 2025