https://t.me/RX1948
Server : Apache/2.4.18 (Ubuntu)
System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64
User : oppastar ( 1041)
PHP Version : 7.0.33-0ubuntu0.16.04.15
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
Directory :  /usr/lib/python2.7/dist-packages/twisted/conch/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/python2.7/dist-packages/twisted/conch/test/test_insults.py
# -*- test-case-name: twisted.conch.test.test_insults -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.python.reflect import namedAny
from twisted.trial import unittest
from twisted.test.proto_helpers import StringTransport

from twisted.conch.insults.insults import ServerProtocol, ClientProtocol
from twisted.conch.insults.insults import CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE_SPECIAL
from twisted.conch.insults.insults import G0, G1
from twisted.conch.insults.insults import modes

def _getattr(mock, name):
    return super(Mock, mock).__getattribute__(name)

def occurrences(mock):
    return _getattr(mock, 'occurrences')

def methods(mock):
    return _getattr(mock, 'methods')

def _append(mock, obj):
    occurrences(mock).append(obj)

default = object()

class Mock(object):
    callReturnValue = default

    def __init__(self, methods=None, callReturnValue=default):
        """
        @param methods: Mapping of names to return values
        @param callReturnValue: object __call__ should return
        """
        self.occurrences = []
        if methods is None:
            methods = {}
        self.methods = methods
        if callReturnValue is not default:
            self.callReturnValue = callReturnValue

    def __call__(self, *a, **kw):
        returnValue = _getattr(self, 'callReturnValue')
        if returnValue is default:
            returnValue = Mock()
        # _getattr(self, 'occurrences').append(('__call__', returnValue, a, kw))
        _append(self, ('__call__', returnValue, a, kw))
        return returnValue

    def __getattribute__(self, name):
        methods = _getattr(self, 'methods')
        if name in methods:
            attrValue = Mock(callReturnValue=methods[name])
        else:
            attrValue = Mock()
        # _getattr(self, 'occurrences').append((name, attrValue))
        _append(self, (name, attrValue))
        return attrValue

class MockMixin:
    def assertCall(self, occurrence, methodName, expectedPositionalArgs=(),
                   expectedKeywordArgs={}):
        attr, mock = occurrence
        self.assertEqual(attr, methodName)
        self.assertEqual(len(occurrences(mock)), 1)
        [(call, result, args, kw)] = occurrences(mock)
        self.assertEqual(call, "__call__")
        self.assertEqual(args, expectedPositionalArgs)
        self.assertEqual(kw, expectedKeywordArgs)
        return result


_byteGroupingTestTemplate = """\
def testByte%(groupName)s(self):
    transport = StringTransport()
    proto = Mock()
    parser = self.protocolFactory(lambda: proto)
    parser.factory = self
    parser.makeConnection(transport)

    bytes = self.TEST_BYTES
    while bytes:
        chunk = bytes[:%(bytesPer)d]
        bytes = bytes[%(bytesPer)d:]
        parser.dataReceived(chunk)

    self.verifyResults(transport, proto, parser)
"""
class ByteGroupingsMixin(MockMixin):
    protocolFactory = None

    for word, n in [('Pairs', 2), ('Triples', 3), ('Quads', 4), ('Quints', 5), ('Sexes', 6)]:
        exec _byteGroupingTestTemplate % {'groupName': word, 'bytesPer': n}
    del word, n

    def verifyResults(self, transport, proto, parser):
        result = self.assertCall(occurrences(proto).pop(0), "makeConnection", (parser,))
        self.assertEqual(occurrences(result), [])

del _byteGroupingTestTemplate

class ServerArrowKeysTests(ByteGroupingsMixin, unittest.TestCase):
    protocolFactory = ServerProtocol

    # All the arrow keys once
    TEST_BYTES = '\x1b[A\x1b[B\x1b[C\x1b[D'

    def verifyResults(self, transport, proto, parser):
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)

        for arrow in (parser.UP_ARROW, parser.DOWN_ARROW,
                      parser.RIGHT_ARROW, parser.LEFT_ARROW):
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (arrow, None))
            self.assertEqual(occurrences(result), [])
        self.assertFalse(occurrences(proto))


class PrintableCharactersTests(ByteGroupingsMixin, unittest.TestCase):
    protocolFactory = ServerProtocol

    # Some letters and digits, first on their own, then capitalized,
    # then modified with alt

    TEST_BYTES = 'abc123ABC!@#\x1ba\x1bb\x1bc\x1b1\x1b2\x1b3'

    def verifyResults(self, transport, proto, parser):
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)

        for char in 'abc123ABC!@#':
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, None))
            self.assertEqual(occurrences(result), [])

        for char in 'abc123':
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, parser.ALT))
            self.assertEqual(occurrences(result), [])

        occs = occurrences(proto)
        self.assertFalse(occs, "%r should have been []" % (occs,))

class ServerFunctionKeysTests(ByteGroupingsMixin, unittest.TestCase):
    """Test for parsing and dispatching function keys (F1 - F12)
    """
    protocolFactory = ServerProtocol

    byteList = []
    for bytes in ('OP', 'OQ', 'OR', 'OS', # F1 - F4
                  '15~', '17~', '18~', '19~', # F5 - F8
                  '20~', '21~', '23~', '24~'): # F9 - F12
        byteList.append('\x1b[' + bytes)
    TEST_BYTES = ''.join(byteList)
    del byteList, bytes

    def verifyResults(self, transport, proto, parser):
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)
        for funcNum in range(1, 13):
            funcArg = getattr(parser, 'F%d' % (funcNum,))
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (funcArg, None))
            self.assertEqual(occurrences(result), [])
        self.assertFalse(occurrences(proto))

class ClientCursorMovementTests(ByteGroupingsMixin, unittest.TestCase):
    protocolFactory = ClientProtocol

    d2 = "\x1b[2B"
    r4 = "\x1b[4C"
    u1 = "\x1b[A"
    l2 = "\x1b[2D"
    # Move the cursor down two, right four, up one, left two, up one, left two
    TEST_BYTES = d2 + r4 + u1 + l2 + u1 + l2
    del d2, r4, u1, l2

    def verifyResults(self, transport, proto, parser):
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)

        for (method, count) in [('Down', 2), ('Forward', 4), ('Up', 1),
                                ('Backward', 2), ('Up', 1), ('Backward', 2)]:
            result = self.assertCall(occurrences(proto).pop(0), "cursor" + method, (count,))
            self.assertEqual(occurrences(result), [])
        self.assertFalse(occurrences(proto))

class ClientControlSequencesTests(unittest.TestCase, MockMixin):
    def setUp(self):
        self.transport = StringTransport()
        self.proto = Mock()
        self.parser = ClientProtocol(lambda: self.proto)
        self.parser.factory = self
        self.parser.makeConnection(self.transport)
        result = self.assertCall(occurrences(self.proto).pop(0), "makeConnection", (self.parser,))
        self.assertFalse(occurrences(result))

    def testSimpleCardinals(self):
        self.parser.dataReceived(
            ''.join([''.join(['\x1b[' + str(n) + ch for n in ('', 2, 20, 200)]) for ch in 'BACD']))
        occs = occurrences(self.proto)

        for meth in ("Down", "Up", "Forward", "Backward"):
            for count in (1, 2, 20, 200):
                result = self.assertCall(occs.pop(0), "cursor" + meth, (count,))
                self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testScrollRegion(self):
        self.parser.dataReceived('\x1b[5;22r\x1b[r')
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "setScrollRegion", (5, 22))
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "setScrollRegion", (None, None))
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testHeightAndWidth(self):
        self.parser.dataReceived("\x1b#3\x1b#4\x1b#5\x1b#6")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "doubleHeightLine", (True,))
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "doubleHeightLine", (False,))
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "singleWidthLine")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "doubleWidthLine")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testCharacterSet(self):
        self.parser.dataReceived(
            ''.join([''.join(['\x1b' + g + n for n in 'AB012']) for g in '()']))
        occs = occurrences(self.proto)

        for which in (G0, G1):
            for charset in (CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE_SPECIAL):
                result = self.assertCall(occs.pop(0), "selectCharacterSet", (charset, which))
                self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testShifting(self):
        self.parser.dataReceived("\x15\x14")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "shiftIn")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "shiftOut")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testSingleShifts(self):
        self.parser.dataReceived("\x1bN\x1bO")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "singleShift2")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "singleShift3")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testKeypadMode(self):
        self.parser.dataReceived("\x1b=\x1b>")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "applicationKeypadMode")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "numericKeypadMode")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testCursor(self):
        self.parser.dataReceived("\x1b7\x1b8")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "saveCursor")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "restoreCursor")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testReset(self):
        self.parser.dataReceived("\x1bc")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "reset")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testIndex(self):
        self.parser.dataReceived("\x1bD\x1bM\x1bE")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "index")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "reverseIndex")
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "nextLine")
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testModes(self):
        self.parser.dataReceived(
            "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "h")
        self.parser.dataReceived(
            "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "l")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "setModes", ([modes.KAM, modes.IRM, modes.LNM],))
        self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "resetModes", ([modes.KAM, modes.IRM, modes.LNM],))
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testErasure(self):
        self.parser.dataReceived(
            "\x1b[K\x1b[1K\x1b[2K\x1b[J\x1b[1J\x1b[2J\x1b[3P")
        occs = occurrences(self.proto)

        for meth in ("eraseToLineEnd", "eraseToLineBeginning", "eraseLine",
                     "eraseToDisplayEnd", "eraseToDisplayBeginning",
                     "eraseDisplay"):
            result = self.assertCall(occs.pop(0), meth)
            self.assertFalse(occurrences(result))

        result = self.assertCall(occs.pop(0), "deleteCharacter", (3,))
        self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testLineDeletion(self):
        self.parser.dataReceived("\x1b[M\x1b[3M")
        occs = occurrences(self.proto)

        for arg in (1, 3):
            result = self.assertCall(occs.pop(0), "deleteLine", (arg,))
            self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testLineInsertion(self):
        self.parser.dataReceived("\x1b[L\x1b[3L")
        occs = occurrences(self.proto)

        for arg in (1, 3):
            result = self.assertCall(occs.pop(0), "insertLine", (arg,))
            self.assertFalse(occurrences(result))
        self.assertFalse(occs)

    def testCursorPosition(self):
        methods(self.proto)['reportCursorPosition'] = (6, 7)
        self.parser.dataReceived("\x1b[6n")
        self.assertEqual(self.transport.value(), "\x1b[7;8R")
        occs = occurrences(self.proto)

        result = self.assertCall(occs.pop(0), "reportCursorPosition")
        # This isn't really an interesting assert, since it only tests that
        # our mock setup is working right, but I'll include it anyway.
        self.assertEqual(result, (6, 7))


    def test_applicationDataBytes(self):
        """
        Contiguous non-control bytes are passed to a single call to the
        C{write} method of the terminal to which the L{ClientProtocol} is
        connected.
        """
        occs = occurrences(self.proto)
        self.parser.dataReceived('a')
        self.assertCall(occs.pop(0), "write", ("a",))
        self.parser.dataReceived('bc')
        self.assertCall(occs.pop(0), "write", ("bc",))


    def _applicationDataTest(self, data, calls):
        occs = occurrences(self.proto)
        self.parser.dataReceived(data)
        while calls:
            self.assertCall(occs.pop(0), *calls.pop(0))
        self.assertFalse(occs, "No other calls should happen: %r" % (occs,))


    def test_shiftInAfterApplicationData(self):
        """
        Application data bytes followed by a shift-in command are passed to a
        call to C{write} before the terminal's C{shiftIn} method is called.
        """
        self._applicationDataTest(
            'ab\x15', [
                ("write", ("ab",)),
                ("shiftIn",)])


    def test_shiftOutAfterApplicationData(self):
        """
        Application data bytes followed by a shift-out command are passed to a
        call to C{write} before the terminal's C{shiftOut} method is called.
        """
        self._applicationDataTest(
            'ab\x14', [
                ("write", ("ab",)),
                ("shiftOut",)])


    def test_cursorBackwardAfterApplicationData(self):
        """
        Application data bytes followed by a cursor-backward command are passed
        to a call to C{write} before the terminal's C{cursorBackward} method is
        called.
        """
        self._applicationDataTest(
            'ab\x08', [
                ("write", ("ab",)),
                ("cursorBackward",)])


    def test_escapeAfterApplicationData(self):
        """
        Application data bytes followed by an escape character are passed to a
        call to C{write} before the terminal's handler method for the escape is
        called.
        """
        # Test a short escape
        self._applicationDataTest(
            'ab\x1bD', [
                ("write", ("ab",)),
                ("index",)])

        # And a long escape
        self._applicationDataTest(
            'ab\x1b[4h', [
                ("write", ("ab",)),
                ("setModes", ([4],))])

        # There's some other cases too, but they're all handled by the same
        # codepaths as above.



class ServerProtocolOutputTests(unittest.TestCase):
    """
    Tests for the bytes L{ServerProtocol} writes to its transport when its
    methods are called.
    """
    def test_nextLine(self):
        """
        L{ServerProtocol.nextLine} writes C{"\r\n"} to its transport.
        """
        # Why doesn't it write ESC E?  Because ESC E is poorly supported.  For
        # example, gnome-terminal (many different versions) fails to scroll if
        # it receives ESC E and the cursor is already on the last row.
        protocol = ServerProtocol()
        transport = StringTransport()
        protocol.makeConnection(transport)
        protocol.nextLine()
        self.assertEqual(transport.value(), "\r\n")



class DeprecationsTests(unittest.TestCase):
    """
    Tests to ensure deprecation of L{insults.colors} and L{insults.client}
    """

    def ensureDeprecated(self, message):
        """
        Ensures that the correct deprecation warning was issued.
        """
        warnings = self.flushWarnings()
        self.assertIs(warnings[0]['category'], DeprecationWarning)
        self.assertEqual(warnings[0]['message'], message)
        self.assertEqual(len(warnings), 1)


    def test_colors(self):
        """
        The L{insults.colors} module is deprecated
        """
        namedAny('twisted.conch.insults.colors')
        self.ensureDeprecated("twisted.conch.insults.colors was deprecated "
                              "in Twisted 10.1.0: Please use "
                              "twisted.conch.insults.helper instead.")


    def test_client(self):
        """
        The L{insults.client} module is deprecated
        """
        namedAny('twisted.conch.insults.client')
        self.ensureDeprecated("twisted.conch.insults.client was deprecated "
                              "in Twisted 10.1.0: Please use "
                              "twisted.conch.insults.insults instead.")

https://t.me/RX1948 - 2025