Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Doc/library/poplib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ A :class:`POP3` instance has the following methods:
.. versionadded:: 3.4


.. method:: POP3.auth(mechanism, authobject=None, initial_response=None)

Authenticate using the POP3 ``AUTH`` command as specified in :rfc:`5034`.

If *initial_response* is provided (``bytes`` or ``str``), it is
base64-encoded and appended to the command after a single space.

If *authobject* is provided, it is called with the server’s ``bytes``
challenge (already base64-decoded) and must return the client response
(``bytes`` or ``str``). Return ``b'*'`` to abort the exchange.


Instances of :class:`POP3_SSL` have no additional methods. The interface of this
subclass is identical to its parent.

Expand Down
46 changes: 46 additions & 0 deletions Lib/poplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import re
import socket
import sys
import base64

try:
import ssl
Expand Down Expand Up @@ -217,6 +218,51 @@ def pass_(self, pswd):
"""
return self._shortcmd('PASS %s' % pswd)

def auth(self, mechanism, authobject=None, initial_response=None):
"""Authenticate to the POP3 server using the AUTH command (RFC 5034).

Result is 'response'.
"""
if authobject is not None and initial_response is not None:
raise ValueError('authobject and initial_response are mutually exclusive')

if initial_response is not None:
if isinstance(initial_response, str):
initial_response = initial_response.encode(self.encoding)
b64 = base64.b64encode(initial_response).decode('ascii')
return self._shortcmd(f'AUTH {mechanism} {b64}'.rstrip())

if authobject is None:
return self._shortcmd(f'AUTH {mechanism}')

self._putcmd(f'AUTH {mechanism}')
while True:
resp = self._getresp()
if resp[:3] == b'+OK':
return resp

challenge_b64 = resp[1:].lstrip(b' ')
if challenge_b64:
try:
challenge = base64.b64decode(challenge_b64)
except Exception:
padded = challenge_b64 + b'=' * (-len(challenge_b64) % 4)
challenge = base64.b64decode(padded, validate=False)
else:
challenge = b''

response = authobject(challenge)
if response is None:
response = b''
if isinstance(response, str):
response = response.encode(self.encoding)

if response == b'*':
self._putcmd('*')
return self._getresp()

self._putcmd(base64.b64encode(response).decode('ascii'))


def stat(self):
"""Get mailbox status.
Expand Down
88 changes: 86 additions & 2 deletions Lib/test/test_poplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
# a real test suite

import base64
import poplib
import socket
import os
Expand Down Expand Up @@ -49,7 +49,7 @@

class DummyPOP3Handler(asynchat.async_chat):

CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
CAPAS = {'UIDL': [], 'SASL': ['PLAIN'], 'IMPLEMENTATION': ['python-testlib-pop-server']}
enable_UTF8 = False

def __init__(self, conn):
Expand All @@ -59,6 +59,8 @@ def __init__(self, conn):
self.push('+OK dummy pop3 server ready. <timestamp>')
self.tls_active = False
self.tls_starting = False
self._auth_pending = False
self._auth_mech = None

def collect_incoming_data(self, data):
self.in_buffer.append(data)
Expand All @@ -67,6 +69,20 @@ def found_terminator(self):
line = b''.join(self.in_buffer)
line = str(line, 'ISO-8859-1')
self.in_buffer = []

if self._auth_pending:
self._auth_pending = False
if line == '*':
self.push('-ERR authentication cancelled')
return
try:
base64.b64decode(line.encode('ascii'))
except Exception:
self.push('-ERR invalid base64')
return
self.push('+OK Logged in.')
return

cmd = line.split(' ')[0].lower()
space = line.find(' ')
if space != -1:
Expand All @@ -85,6 +101,28 @@ def handle_error(self):
def push(self, data):
asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')

def cmd_auth(self, arg):
parts = arg.split()
if not parts:
self.push('-ERR missing mechanism')
return
mech = parts[0].upper()
if mech != 'PLAIN':
self.push('-ERR unsupported mechanism')
return
if len(parts) >= 2:
try:
base64.b64decode(parts[1].encode('ascii'))
except Exception:
self.push('-ERR invalid base64')
return
self.push('+OK Logged in.')
else:
self._auth_pending = True
self._auth_mech = mech
self.in_buffer.clear()
self.push('+ ')

def cmd_echo(self, arg):
# sends back the received string (used by the test suite)
self.push(arg)
Expand Down Expand Up @@ -286,6 +324,49 @@ def test_pass_(self):
self.assertOK(self.client.pass_('python'))
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')

def test_auth_plain_initial_response(self):
secret = b"user\x00adminuser\x00password"
resp = self.client.auth("PLAIN", initial_response=secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_plain_challenge_response(self):
secret = b"user\x00adminuser\x00password"
def authobject(challenge):
return secret
resp = self.client.auth("PLAIN", authobject=authobject)
self.assertStartsWith(resp, b"+OK")

def test_auth_rejects_conflicting_args(self):
with self.assertRaises(ValueError):
self.client.auth("PLAIN", authobject=lambda c: b"x", initial_response=b"y")

def test_auth_unsupported_mechanism(self):
with self.assertRaises(poplib.error_proto):
self.client.auth("FOO")

def test_auth_cancel(self):
def authobject(_challenge):
return b"*"
with self.assertRaises(poplib.error_proto):
self.client.auth("PLAIN", authobject=authobject)

def test_auth_mechanism_case_insensitive(self):
secret = b"user\x00adminuser\x00password"
# use lowercase mechanism name to ensure server accepts
resp = self.client.auth("plain", initial_response=secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_initial_response_str(self):
secret = "user\x00adminuser\x00password" # str, not bytes
resp = self.client.auth("PLAIN", initial_response=secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_authobject_returns_str(self):
def authobject(challenge):
return "user\x00adminuser\x00password"
resp = self.client.auth("PLAIN", authobject=authobject)
self.assertStartsWith(resp, b"+OK")

def test_stat(self):
self.assertEqual(self.client.stat(), (10, 100))

Expand Down Expand Up @@ -434,6 +515,9 @@ def __init__(self, conn):
self.push('+OK dummy pop3 server ready. <timestamp>')
self.tls_active = True
self.tls_starting = False
# Initialize AUTH state like DummyPOP3Handler to avoid AttributeError
self._auth_pending = False
self._auth_mech = None


@requires_ssl
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add RFC 5034 AUTH support to poplib
Loading