https://t.me/RX1948
Server : Apache/2.4.18 (Ubuntu)
System : Linux canvaswebdesign 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64
User : oppastar ( 1041)
PHP Version : 7.0.33-0ubuntu0.16.04.15
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
Directory :  /usr/lib/python2.7/dist-packages/twisted/words/protocols/jabber/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/python2.7/dist-packages/twisted/words/protocols/jabber/sasl_mechanisms.py
# -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Protocol agnostic implementations of SASL authentication mechanisms.
"""

import binascii, random, time, os
from hashlib import md5

from zope.interface import Interface, Attribute, implements


class ISASLMechanism(Interface):
    name = Attribute("""Common name for the SASL Mechanism.""")

    def getInitialResponse():
        """
        Get the initial client response, if defined for this mechanism.

        @return: initial client response string.
        @rtype: C{str}.
        """


    def getResponse(challenge):
        """
        Get the response to a server challenge.

        @param challenge: server challenge.
        @type challenge: C{str}.
        @return: client response.
        @rtype: C{str}.
        """



class Anonymous(object):
    """
    Implements the ANONYMOUS SASL authentication mechanism.

    This mechanism is defined in RFC 2245.
    """
    implements(ISASLMechanism)
    name = 'ANONYMOUS'

    def getInitialResponse(self):
        return None



class Plain(object):
    """
    Implements the PLAIN SASL authentication mechanism.

    The PLAIN SASL authentication mechanism is defined in RFC 2595.
    """
    implements(ISASLMechanism)

    name = 'PLAIN'

    def __init__(self, authzid, authcid, password):
        self.authzid = authzid or ''
        self.authcid = authcid or ''
        self.password = password or ''


    def getInitialResponse(self):
        return "%s\x00%s\x00%s" % (self.authzid.encode('utf-8'),
                                   self.authcid.encode('utf-8'),
                                   self.password.encode('utf-8'))



class DigestMD5(object):
    """
    Implements the DIGEST-MD5 SASL authentication mechanism.

    The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
    """
    implements(ISASLMechanism)

    name = 'DIGEST-MD5'

    def __init__(self, serv_type, host, serv_name, username, password):
        """
        @param serv_type: An indication of what kind of server authentication
            is being attempted against.  For example, C{u"xmpp"}.
        @type serv_type: C{unicode}

        @param host: The authentication hostname.  Also known as the realm.
            This is used as a scope to help select the right credentials.
        @type host: C{unicode}

        @param serv_name: An additional identifier for the server.
        @type serv_name: C{unicode}

        @param username: The authentication username to use to respond to a
            challenge.
        @type username: C{unicode}

        @param username: The authentication password to use to respond to a
            challenge.
        @type password: C{unicode}
        """
        self.username = username
        self.password = password
        self.defaultRealm = host

        self.digest_uri = u'%s/%s' % (serv_type, host)
        if serv_name is not None:
            self.digest_uri += u'/%s' % (serv_name,)


    def getInitialResponse(self):
        return None


    def getResponse(self, challenge):
        directives = self._parse(challenge)

        # Compat for implementations that do not send this along with
        # a succesful authentication.
        if 'rspauth' in directives:
            return ''

        try:
            realm = directives['realm']
        except KeyError:
            realm = self.defaultRealm.encode(directives['charset'])

        return self._genResponse(directives['charset'],
                                 realm,
                                 directives['nonce'])


    def _parse(self, challenge):
        """
        Parses the server challenge.

        Splits the challenge into a dictionary of directives with values.

        @return: challenge directives and their values.
        @rtype: C{dict} of C{str} to C{str}.
        """
        s = challenge
        paramDict = {}
        cur = 0
        remainingParams = True
        while remainingParams:
            # Parse a param. We can't just split on commas, because there can
            # be some commas inside (quoted) param values, e.g.:
            # qop="auth,auth-int"

            middle = s.index("=", cur)
            name = s[cur:middle].lstrip()
            middle += 1
            if s[middle] == '"':
                middle += 1
                end = s.index('"', middle)
                value = s[middle:end]
                cur = s.find(',', end) + 1
                if cur == 0:
                    remainingParams = False
            else:
                end = s.find(',', middle)
                if end == -1:
                    value = s[middle:].rstrip()
                    remainingParams = False
                else:
                    value = s[middle:end].rstrip()
                cur = end + 1
            paramDict[name] = value

        for param in ('qop', 'cipher'):
            if param in paramDict:
                paramDict[param] = paramDict[param].split(',')

        return paramDict

    def _unparse(self, directives):
        """
        Create message string from directives.

        @param directives: dictionary of directives (names to their values).
                           For certain directives, extra quotes are added, as
                           needed.
        @type directives: C{dict} of C{str} to C{str}
        @return: message string.
        @rtype: C{str}.
        """

        directive_list = []
        for name, value in directives.iteritems():
            if name in ('username', 'realm', 'cnonce',
                        'nonce', 'digest-uri', 'authzid', 'cipher'):
                directive = '%s="%s"' % (name, value)
            else:
                directive = '%s=%s' % (name, value)

            directive_list.append(directive)

        return ','.join(directive_list)


    def _calculateResponse(self, cnonce, nc, nonce,
                            username, password, realm, uri):
        """
        Calculates response with given encoded parameters.

        @return: The I{response} field of a response to a Digest-MD5 challenge
            of the given parameters.
        @rtype: L{bytes}
        """
        def H(s):
            return md5(s).digest()

        def HEX(n):
            return binascii.b2a_hex(n)

        def KD(k, s):
            return H('%s:%s' % (k, s))

        a1 = "%s:%s:%s" % (
            H("%s:%s:%s" % (username, realm, password)), nonce, cnonce)
        a2 = "AUTHENTICATE:%s" % (uri,)

        response = HEX(KD(HEX(H(a1)), "%s:%s:%s:%s:%s" % (
                    nonce, nc, cnonce, "auth", HEX(H(a2)))))
        return response


    def _genResponse(self, charset, realm, nonce):
        """
        Generate response-value.

        Creates a response to a challenge according to section 2.1.2.1 of
        RFC 2831 using the C{charset}, C{realm} and C{nonce} directives
        from the challenge.
        """
        try:
            username = self.username.encode(charset)
            password = self.password.encode(charset)
            digest_uri = self.digest_uri.encode(charset)
        except UnicodeError:
            # TODO - add error checking
            raise

        nc = '%08x' % (1,) # TODO: support subsequent auth.
        cnonce = self._gen_nonce()
        qop = 'auth'

        # TODO - add support for authzid
        response = self._calculateResponse(cnonce, nc, nonce,
                                           username, password, realm,
                                           digest_uri)

        directives = {'username': username,
                      'realm' : realm,
                      'nonce' : nonce,
                      'cnonce' : cnonce,
                      'nc' : nc,
                      'qop' : qop,
                      'digest-uri': digest_uri,
                      'response': response,
                      'charset': charset}

        return self._unparse(directives)


    def _gen_nonce(self):
        return md5("%s:%s:%s" % (random.random(),
                                 time.gmtime(),
                                 os.getpid())).hexdigest()

https://t.me/RX1948 - 2025