https://t.me/RX1948
Server : Apache/2.4.18 (Ubuntu)
System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64
User : oppastar ( 1041)
PHP Version : 7.0.33-0ubuntu0.16.04.15
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
Directory :  /usr/lib/python2.7/dist-packages/twisted/trial/_dist/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/python2.7/dist-packages/twisted/trial/_dist/test/test_worker.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test for distributed trial worker side.
"""

import os
from cStringIO import StringIO

from zope.interface.verify import verifyObject

from twisted.trial.reporter import TestResult
from twisted.trial.unittest import TestCase
from twisted.trial._dist.worker import (
    LocalWorker, LocalWorkerAMP, LocalWorkerTransport, WorkerProtocol)
from twisted.trial._dist import managercommands, workercommands

from twisted.scripts import trial
from twisted.test.proto_helpers import StringTransport

from twisted.internet.interfaces import ITransport, IAddress
from twisted.internet.defer import fail, succeed
from twisted.internet.main import CONNECTION_DONE
from twisted.internet.error import ConnectionDone
from twisted.python.failure import Failure
from twisted.protocols.amp import AMP



class FakeAMP(AMP):
    """
    A fake amp protocol.
    """



class WorkerProtocolTests(TestCase):
    """
    Tests for L{WorkerProtocol}.
    """

    def setUp(self):
        """
        Set up a transport, a result stream and a protocol instance.
        """
        self.serverTransport = StringTransport()
        self.clientTransport = StringTransport()
        self.server = WorkerProtocol()
        self.server.makeConnection(self.serverTransport)
        self.client = FakeAMP()
        self.client.makeConnection(self.clientTransport)


    def test_run(self):
        """
        Calling the L{workercommands.Run} command on the client returns a
        response with C{success} sets to C{True}.
        """
        d = self.client.callRemote(workercommands.Run, testCase="doesntexist")

        def check(result):
            self.assertTrue(result['success'])

        d.addCallback(check)
        self.server.dataReceived(self.clientTransport.value())
        self.clientTransport.clear()
        self.client.dataReceived(self.serverTransport.value())
        self.serverTransport.clear()
        return d


    def test_start(self):
        """
        The C{start} command changes the current path.
        """
        curdir = os.path.realpath(os.path.curdir)
        self.addCleanup(os.chdir, curdir)
        self.server.start('..')
        self.assertNotEqual(os.path.realpath(os.path.curdir), curdir)



class LocalWorkerAMPTests(TestCase):
    """
    Test case for distributed trial's manager-side local worker AMP protocol
    """

    def setUp(self):
        self.managerTransport = StringTransport()
        self.managerAMP = LocalWorkerAMP()
        self.managerAMP.makeConnection(self.managerTransport)
        self.result = TestResult()
        self.workerTransport = StringTransport()
        self.worker = AMP()
        self.worker.makeConnection(self.workerTransport)

        config = trial.Options()
        self.testName = "twisted.doesnexist"
        config['tests'].append(self.testName)
        self.testCase = trial._getSuite(config)._tests.pop()

        self.managerAMP.run(self.testCase, self.result)
        self.managerTransport.clear()


    def pumpTransports(self):
        """
        Sends data from C{self.workerTransport} to C{self.managerAMP}, and then
        data from C{self.managerTransport} back to C{self.worker}.
        """
        self.managerAMP.dataReceived(self.workerTransport.value())
        self.workerTransport.clear()
        self.worker.dataReceived(self.managerTransport.value())


    def test_runSuccess(self):
        """
        Run a test, and succeed.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddSuccess,
                                   testName=self.testName)
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertTrue(results)


    def test_runExpectedFailure(self):
        """
        Run a test, and fail expectedly.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddExpectedFailure,
                                   testName=self.testName, error='error',
                                   todo='todoReason')
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.expectedFailures[0][0])
        self.assertTrue(results)


    def test_runError(self):
        """
        Run a test, and encounter an error.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddError,
                                   testName=self.testName, error='error',
                                   errorClass='exceptions.ValueError',
                                   frames=[])
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.errors[0][0])
        self.assertTrue(results)


    def test_runErrorWithFrames(self):
        """
        L{LocalWorkerAMP._buildFailure} recreates the C{Failure.frames} from
        the C{frames} argument passed to C{AddError}.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddError,
                                   testName=self.testName, error='error',
                                   errorClass='exceptions.ValueError',
                                   frames=["file.py", "invalid code", "3"])
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.errors[0][0])
        self.assertEqual(
            [('file.py', 'invalid code', 3, [], [])],
            self.result.errors[0][1].frames)
        self.assertTrue(results)


    def test_runFailure(self):
        """
        Run a test, and fail.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddFailure,
                                   testName=self.testName, fail='fail',
                                   failClass='exceptions.RuntimeError',
                                   frames=[])
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.failures[0][0])
        self.assertTrue(results)


    def test_runSkip(self):
        """
        Run a test, but skip it.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddSkip,
                                   testName=self.testName, reason='reason')
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.skips[0][0])
        self.assertTrue(results)


    def test_runUnexpectedSuccesses(self):
        """
        Run a test, and succeed unexpectedly.
        """
        results = []

        d = self.worker.callRemote(managercommands.AddUnexpectedSuccess,
                                   testName=self.testName,
                                   todo='todo')
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual(self.testCase, self.result.unexpectedSuccesses[0][0])
        self.assertTrue(results)


    def test_testWrite(self):
        """
        L{LocalWorkerAMP.testWrite} writes the data received to its test
        stream.
        """
        results = []
        stream = StringIO()
        self.managerAMP.setTestStream(stream)


        d = self.worker.callRemote(managercommands.TestWrite,
                                   out="Some output")
        d.addCallback(lambda result: results.append(result['success']))
        self.pumpTransports()

        self.assertEqual("Some output\n", stream.getvalue())
        self.assertTrue(results)


    def test_stopAfterRun(self):
        """
        L{LocalWorkerAMP.run} calls C{stopTest} on its test result once the
        C{Run} commands has succeeded.
        """
        result = object()
        stopped = []

        def fakeCallRemote(command, testCase):
            return succeed(result)

        self.managerAMP.callRemote = fakeCallRemote

        class StopTestResult(TestResult):

            def stopTest(self, test):
                stopped.append(test)


        d = self.managerAMP.run(self.testCase, StopTestResult())
        self.assertEqual([self.testCase], stopped)
        return d.addCallback(self.assertIdentical, result)



class FakeAMProtocol(AMP):
    """
    A fake implementation of L{AMP} for testing.
    """
    id = 0
    dataString = ""

    def dataReceived(self, data):
        self.dataString += data


    def setTestStream(self, stream):
        self.testStream = stream



class FakeTransport(object):
    """
    A fake process transport implementation for testing.
    """
    dataString = ""
    calls = 0

    def writeToChild(self, fd, data):
        self.dataString += data


    def loseConnection(self):
        self.calls += 1



class LocalWorkerTests(TestCase):
    """
    Tests for L{LocalWorker} and L{LocalWorkerTransport}.
    """

    def test_childDataReceived(self):
        """
        L{LocalWorker.childDataReceived} forwards the received data to linked
        L{AMP} protocol if the right file descriptor, otherwise forwards to
        C{ProcessProtocol.childDataReceived}.
        """
        fakeTransport = FakeTransport()
        localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
        localWorker.makeConnection(fakeTransport)
        localWorker._outLog = StringIO()
        localWorker.childDataReceived(4, "foo")
        localWorker.childDataReceived(1, "bar")
        self.assertEqual("foo", localWorker._ampProtocol.dataString)
        self.assertEqual("bar", localWorker._outLog.getvalue())


    def test_outReceived(self):
        """
        L{LocalWorker.outReceived} logs the output into its C{_outLog} log
        file.
        """
        fakeTransport = FakeTransport()
        localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
        localWorker.makeConnection(fakeTransport)
        localWorker._outLog = StringIO()
        data = "The quick brown fox jumps over the lazy dog"
        localWorker.outReceived(data)
        self.assertEqual(data, localWorker._outLog.getvalue())


    def test_errReceived(self):
        """
        L{LocalWorker.errReceived} logs the errors into its C{_errLog} log
        file.
        """
        fakeTransport = FakeTransport()
        localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
        localWorker.makeConnection(fakeTransport)
        localWorker._errLog = StringIO()
        data = "The quick brown fox jumps over the lazy dog"
        localWorker.errReceived(data)
        self.assertEqual(data, localWorker._errLog.getvalue())


    def test_write(self):
        """
        L{LocalWorkerTransport.write} forwards the written data to the given
        transport.
        """
        transport = FakeTransport()
        localTransport = LocalWorkerTransport(transport)
        data = "The quick brown fox jumps over the lazy dog"
        localTransport.write(data)
        self.assertEqual(data, transport.dataString)


    def test_writeSequence(self):
        """
        L{LocalWorkerTransport.writeSequence} forwards the written data to the
        given transport.
        """
        transport = FakeTransport()
        localTransport = LocalWorkerTransport(transport)
        data = ("The quick ", "brown fox jumps ", "over the lazy dog")
        localTransport.writeSequence(data)
        self.assertEqual("".join(data), transport.dataString)


    def test_loseConnection(self):
        """
        L{LocalWorkerTransport.loseConnection} forwards the call to the given
        transport.
        """
        transport = FakeTransport()
        localTransport = LocalWorkerTransport(transport)
        localTransport.loseConnection()

        self.assertEqual(transport.calls, 1)


    def test_connectionLost(self):
        """
        L{LocalWorker.connectionLost} closes the log streams.
        """

        class FakeStream(object):
            callNumber = 0

            def close(self):
                self.callNumber += 1


        transport = FakeTransport()
        localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
        localWorker.makeConnection(transport)
        localWorker._outLog = FakeStream()
        localWorker._errLog = FakeStream()
        localWorker.connectionLost(None)
        self.assertEqual(localWorker._outLog.callNumber, 1)
        self.assertEqual(localWorker._errLog.callNumber, 1)


    def test_processEnded(self):
        """
        L{LocalWorker.processEnded} calls C{connectionLost} on itself and on
        the L{AMP} protocol.
        """

        class FakeStream(object):
            callNumber = 0

            def close(self):
                self.callNumber += 1


        transport = FakeTransport()
        protocol = FakeAMProtocol()
        localWorker = LocalWorker(protocol, '.', 'test.log')
        localWorker.makeConnection(transport)
        localWorker._outLog = FakeStream()
        localWorker.processEnded(Failure(CONNECTION_DONE))
        self.assertEqual(localWorker._outLog.callNumber, 1)
        self.assertIdentical(None, protocol.transport)
        return self.assertFailure(localWorker.endDeferred, ConnectionDone)

    def test_addresses(self):
        """
        L{LocalWorkerTransport.getPeer} and L{LocalWorkerTransport.getHost}
        return L{IAddress} objects.
        """
        localTransport = LocalWorkerTransport(None)
        self.assertTrue(verifyObject(IAddress, localTransport.getPeer()))
        self.assertTrue(verifyObject(IAddress, localTransport.getHost()))


    def test_transport(self):
        """
        L{LocalWorkerTransport} implements L{ITransport} to be able to be used
        by L{AMP}.
        """
        localTransport = LocalWorkerTransport(None)
        self.assertTrue(verifyObject(ITransport, localTransport))


    def test_startError(self):
        """
        L{LocalWorker} swallows the exceptions returned by the L{AMP} protocol
        start method, as it generates unnecessary errors.
        """

        def failCallRemote(command, directory):
            return fail(RuntimeError("oops"))

        transport = FakeTransport()
        protocol = FakeAMProtocol()
        protocol.callRemote = failCallRemote
        localWorker = LocalWorker(protocol, '.', 'test.log')
        localWorker.makeConnection(transport)

        self.assertEqual([], self.flushLoggedErrors(RuntimeError))

https://t.me/RX1948 - 2025