Server : Apache/2.4.18 (Ubuntu) System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64 User : oppastar ( 1041) PHP Version : 7.0.33-0ubuntu0.16.04.15 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, Directory : /usr/lib/python2.7/dist-packages/twisted/internet/test/ |
Upload File : |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.internet.epollreactor}. """ from __future__ import division, absolute_import from twisted.trial.unittest import TestCase try: from twisted.internet.epollreactor import _ContinuousPolling except ImportError: _ContinuousPolling = None from twisted.internet.task import Clock from twisted.internet.error import ConnectionDone class Descriptor(object): """ Records reads and writes, as if it were a C{FileDescriptor}. """ def __init__(self): self.events = [] def fileno(self): return 1 def doRead(self): self.events.append("read") def doWrite(self): self.events.append("write") def connectionLost(self, reason): reason.trap(ConnectionDone) self.events.append("lost") class ContinuousPollingTests(TestCase): """ L{_ContinuousPolling} can be used to read and write from C{FileDescriptor} objects. """ def test_addReader(self): """ Adding a reader when there was previously no reader starts up a C{LoopingCall}. """ poller = _ContinuousPolling(Clock()) self.assertEqual(poller._loop, None) reader = object() self.assertFalse(poller.isReading(reader)) poller.addReader(reader) self.assertNotEqual(poller._loop, None) self.assertTrue(poller._loop.running) self.assertIs(poller._loop.clock, poller._reactor) self.assertTrue(poller.isReading(reader)) def test_addWriter(self): """ Adding a writer when there was previously no writer starts up a C{LoopingCall}. """ poller = _ContinuousPolling(Clock()) self.assertEqual(poller._loop, None) writer = object() self.assertFalse(poller.isWriting(writer)) poller.addWriter(writer) self.assertNotEqual(poller._loop, None) self.assertTrue(poller._loop.running) self.assertIs(poller._loop.clock, poller._reactor) self.assertTrue(poller.isWriting(writer)) def test_removeReader(self): """ Removing a reader stops the C{LoopingCall}. """ poller = _ContinuousPolling(Clock()) reader = object() poller.addReader(reader) poller.removeReader(reader) self.assertEqual(poller._loop, None) self.assertEqual(poller._reactor.getDelayedCalls(), []) self.assertFalse(poller.isReading(reader)) def test_removeWriter(self): """ Removing a writer stops the C{LoopingCall}. """ poller = _ContinuousPolling(Clock()) writer = object() poller.addWriter(writer) poller.removeWriter(writer) self.assertEqual(poller._loop, None) self.assertEqual(poller._reactor.getDelayedCalls(), []) self.assertFalse(poller.isWriting(writer)) def test_removeUnknown(self): """ Removing unknown readers and writers silently does nothing. """ poller = _ContinuousPolling(Clock()) poller.removeWriter(object()) poller.removeReader(object()) def test_multipleReadersAndWriters(self): """ Adding multiple readers and writers results in a single C{LoopingCall}. """ poller = _ContinuousPolling(Clock()) writer = object() poller.addWriter(writer) self.assertNotEqual(poller._loop, None) poller.addWriter(object()) self.assertNotEqual(poller._loop, None) poller.addReader(object()) self.assertNotEqual(poller._loop, None) poller.addReader(object()) poller.removeWriter(writer) self.assertNotEqual(poller._loop, None) self.assertTrue(poller._loop.running) self.assertEqual(len(poller._reactor.getDelayedCalls()), 1) def test_readerPolling(self): """ Adding a reader causes its C{doRead} to be called every 1 milliseconds. """ reactor = Clock() poller = _ContinuousPolling(reactor) desc = Descriptor() poller.addReader(desc) self.assertEqual(desc.events, []) reactor.advance(0.00001) self.assertEqual(desc.events, ["read"]) reactor.advance(0.00001) self.assertEqual(desc.events, ["read", "read"]) reactor.advance(0.00001) self.assertEqual(desc.events, ["read", "read", "read"]) def test_writerPolling(self): """ Adding a writer causes its C{doWrite} to be called every 1 milliseconds. """ reactor = Clock() poller = _ContinuousPolling(reactor) desc = Descriptor() poller.addWriter(desc) self.assertEqual(desc.events, []) reactor.advance(0.001) self.assertEqual(desc.events, ["write"]) reactor.advance(0.001) self.assertEqual(desc.events, ["write", "write"]) reactor.advance(0.001) self.assertEqual(desc.events, ["write", "write", "write"]) def test_connectionLostOnRead(self): """ If a C{doRead} returns a value indicating disconnection, C{connectionLost} is called on it. """ reactor = Clock() poller = _ContinuousPolling(reactor) desc = Descriptor() desc.doRead = lambda: ConnectionDone() poller.addReader(desc) self.assertEqual(desc.events, []) reactor.advance(0.001) self.assertEqual(desc.events, ["lost"]) def test_connectionLostOnWrite(self): """ If a C{doWrite} returns a value indicating disconnection, C{connectionLost} is called on it. """ reactor = Clock() poller = _ContinuousPolling(reactor) desc = Descriptor() desc.doWrite = lambda: ConnectionDone() poller.addWriter(desc) self.assertEqual(desc.events, []) reactor.advance(0.001) self.assertEqual(desc.events, ["lost"]) def test_removeAll(self): """ L{_ContinuousPolling.removeAll} removes all descriptors and returns the readers and writers. """ poller = _ContinuousPolling(Clock()) reader = object() writer = object() both = object() poller.addReader(reader) poller.addReader(both) poller.addWriter(writer) poller.addWriter(both) removed = poller.removeAll() self.assertEqual(poller.getReaders(), []) self.assertEqual(poller.getWriters(), []) self.assertEqual(len(removed), 3) self.assertEqual(set(removed), set([reader, writer, both])) def test_getReaders(self): """ L{_ContinuousPolling.getReaders} returns a list of the read descriptors. """ poller = _ContinuousPolling(Clock()) reader = object() poller.addReader(reader) self.assertIn(reader, poller.getReaders()) def test_getWriters(self): """ L{_ContinuousPolling.getWriters} returns a list of the write descriptors. """ poller = _ContinuousPolling(Clock()) writer = object() poller.addWriter(writer) self.assertIn(writer, poller.getWriters()) if _ContinuousPolling is None: skip = "epoll not supported in this environment."