| # coding: utf-8 |
| import asyncore |
| import email |
| import os |
| import shutil |
| import smtpd |
| import sys |
| from StringIO import StringIO |
| import tempfile |
| import threading |
| |
| from django.conf import settings |
| from django.core import mail |
| from django.core.mail import (EmailMessage, mail_admins, mail_managers, |
| EmailMultiAlternatives, send_mail, send_mass_mail) |
| from django.core.mail.backends import console, dummy, locmem, filebased, smtp |
| from django.core.mail.message import BadHeaderError |
| from django.test import TestCase |
| from django.utils.translation import ugettext_lazy |
| from django.utils.functional import wraps |
| |
| |
| def alter_django_settings(**kwargs): |
| oldvalues = {} |
| nonexistant = [] |
| for setting, newvalue in kwargs.iteritems(): |
| try: |
| oldvalues[setting] = getattr(settings, setting) |
| except AttributeError: |
| nonexistant.append(setting) |
| setattr(settings, setting, newvalue) |
| return oldvalues, nonexistant |
| |
| |
| def restore_django_settings(state): |
| oldvalues, nonexistant = state |
| for setting, oldvalue in oldvalues.iteritems(): |
| setattr(settings, setting, oldvalue) |
| for setting in nonexistant: |
| delattr(settings, setting) |
| |
| |
| def with_django_settings(**kwargs): |
| def decorator(test): |
| @wraps(test) |
| def decorated_test(self): |
| state = alter_django_settings(**kwargs) |
| try: |
| return test(self) |
| finally: |
| restore_django_settings(state) |
| return decorated_test |
| return decorator |
| |
| |
| class MailTests(TestCase): |
| """ |
| Non-backend specific tests. |
| """ |
| |
| def test_ascii(self): |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) |
| message = email.message() |
| self.assertEqual(message['Subject'].encode(), 'Subject') |
| self.assertEqual(message.get_payload(), 'Content') |
| self.assertEqual(message['From'], 'from@example.com') |
| self.assertEqual(message['To'], 'to@example.com') |
| |
| def test_multiple_recipients(self): |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com']) |
| message = email.message() |
| self.assertEqual(message['Subject'].encode(), 'Subject') |
| self.assertEqual(message.get_payload(), 'Content') |
| self.assertEqual(message['From'], 'from@example.com') |
| self.assertEqual(message['To'], 'to@example.com, other@example.com') |
| |
| def test_cc(self): |
| """Regression test for #7722""" |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) |
| message = email.message() |
| self.assertEqual(message['Cc'], 'cc@example.com') |
| self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com']) |
| |
| # Test multiple CC with multiple To |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com']) |
| message = email.message() |
| self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com') |
| self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com']) |
| |
| # Testing with Bcc |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'], bcc=['bcc@example.com']) |
| message = email.message() |
| self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com') |
| self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com', 'bcc@example.com']) |
| |
| def test_header_injection(self): |
| email = EmailMessage('Subject\nInjection Test', 'Content', 'from@example.com', ['to@example.com']) |
| self.assertRaises(BadHeaderError, email.message) |
| email = EmailMessage(ugettext_lazy('Subject\nInjection Test'), 'Content', 'from@example.com', ['to@example.com']) |
| self.assertRaises(BadHeaderError, email.message) |
| |
| def test_space_continuation(self): |
| """ |
| Test for space continuation character in long (ascii) subject headers (#7747) |
| """ |
| email = EmailMessage('Long subject lines that get wrapped should use a space continuation character to get expected behaviour in Outlook and Thunderbird', 'Content', 'from@example.com', ['to@example.com']) |
| message = email.message() |
| self.assertEqual(message['Subject'], 'Long subject lines that get wrapped should use a space continuation\n character to get expected behaviour in Outlook and Thunderbird') |
| |
| def test_message_header_overrides(self): |
| """ |
| Specifying dates or message-ids in the extra headers overrides the |
| default values (#9233) |
| """ |
| headers = {"date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} |
| email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) |
| self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent') |
| |
| def test_from_header(self): |
| """ |
| Make sure we can manually set the From header (#9214) |
| """ |
| email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| message = email.message() |
| self.assertEqual(message['From'], 'from@example.com') |
| |
| def test_multiple_message_call(self): |
| """ |
| Regression for #13259 - Make sure that headers are not changed when |
| calling EmailMessage.message() |
| """ |
| email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| message = email.message() |
| self.assertEqual(message['From'], 'from@example.com') |
| message = email.message() |
| self.assertEqual(message['From'], 'from@example.com') |
| |
| def test_unicode_address_header(self): |
| """ |
| Regression for #11144 - When a to/from/cc header contains unicode, |
| make sure the email addresses are parsed correctly (especially with |
| regards to commas) |
| """ |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com']) |
| self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com') |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com']) |
| self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com') |
| |
| def test_unicode_headers(self): |
| email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"], |
| headers={"Sender": '"Firstname Sürname" <sender@example.com>', |
| "Comments": 'My Sürname is non-ASCII'}) |
| message = email.message() |
| self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=') |
| self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>') |
| self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=') |
| |
| def test_safe_mime_multipart(self): |
| """ |
| Make sure headers can be set with a different encoding than utf-8 in |
| SafeMIMEMultipart as well |
| """ |
| headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} |
| subject, from_email, to = 'hello', 'from@example.com', '"Sürname, Firstname" <to@example.com>' |
| text_content = 'This is an important message.' |
| html_content = '<p>This is an <strong>important</strong> message.</p>' |
| msg = EmailMultiAlternatives('Message from Firstname Sürname', text_content, from_email, [to], headers=headers) |
| msg.attach_alternative(html_content, "text/html") |
| msg.encoding = 'iso-8859-1' |
| self.assertEqual(msg.message()['To'], '=?iso-8859-1?q?S=FCrname=2C_Firstname?= <to@example.com>') |
| self.assertEqual(msg.message()['Subject'].encode(), u'=?iso-8859-1?q?Message_from_Firstname_S=FCrname?=') |
| |
| def test_encoding(self): |
| """ |
| Regression for #12791 - Encode body correctly with other encodings |
| than utf-8 |
| """ |
| email = EmailMessage('Subject', 'Firstname Sürname is a great guy.', 'from@example.com', ['other@example.com']) |
| email.encoding = 'iso-8859-1' |
| message = email.message() |
| self.assertTrue(message.as_string().startswith('Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: other@example.com')) |
| self.assertEqual(message.get_payload(), 'Firstname S=FCrname is a great guy.') |
| |
| # Make sure MIME attachments also works correctly with other encodings than utf-8 |
| text_content = 'Firstname Sürname is a great guy.' |
| html_content = '<p>Firstname Sürname is a <strong>great</strong> guy.</p>' |
| msg = EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com']) |
| msg.encoding = 'iso-8859-1' |
| msg.attach_alternative(html_content, "text/html") |
| self.assertEqual(msg.message().get_payload(0).as_string(), 'Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\nFirstname S=FCrname is a great guy.') |
| self.assertEqual(msg.message().get_payload(1).as_string(), 'Content-Type: text/html; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\n<p>Firstname S=FCrname is a <strong>great</strong> guy.</p>') |
| |
| def test_attachments(self): |
| """Regression test for #9367""" |
| headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} |
| subject, from_email, to = 'hello', 'from@example.com', 'to@example.com' |
| text_content = 'This is an important message.' |
| html_content = '<p>This is an <strong>important</strong> message.</p>' |
| msg = EmailMultiAlternatives(subject, text_content, from_email, [to], headers=headers) |
| msg.attach_alternative(html_content, "text/html") |
| msg.attach("an attachment.pdf", "%PDF-1.4.%...", mimetype="application/pdf") |
| msg_str = msg.message().as_string() |
| message = email.message_from_string(msg_str) |
| self.assertTrue(message.is_multipart()) |
| self.assertEqual(message.get_content_type(), 'multipart/mixed') |
| self.assertEqual(message.get_default_type(), 'text/plain') |
| payload = message.get_payload() |
| self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') |
| self.assertEqual(payload[1].get_content_type(), 'application/pdf') |
| |
| def test_dummy_backend(self): |
| """ |
| Make sure that dummy backends returns correct number of sent messages |
| """ |
| connection = dummy.EmailBackend() |
| email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| self.assertEqual(connection.send_messages([email, email, email]), 3) |
| |
| def test_arbitrary_keyword(self): |
| """ |
| Make sure that get_connection() accepts arbitrary keyword that might be |
| used with custom backends. |
| """ |
| c = mail.get_connection(fail_silently=True, foo='bar') |
| self.assertTrue(c.fail_silently) |
| |
| def test_custom_backend(self): |
| """Test custom backend defined in this suite.""" |
| conn = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') |
| self.assertTrue(hasattr(conn, 'test_outbox')) |
| email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| conn.send_messages([email]) |
| self.assertEqual(len(conn.test_outbox), 1) |
| |
| def test_backend_arg(self): |
| """Test backend argument of mail.get_connection()""" |
| self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.smtp.EmailBackend'), smtp.EmailBackend)) |
| self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.locmem.EmailBackend'), locmem.EmailBackend)) |
| self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) |
| self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) |
| tmp_dir = tempfile.mkdtemp() |
| try: |
| self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) |
| finally: |
| shutil.rmtree(tmp_dir) |
| self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) |
| |
| @with_django_settings( |
| EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend', |
| ADMINS=[('nobody', 'nobody@example.com')], |
| MANAGERS=[('nobody', 'nobody@example.com')]) |
| def test_connection_arg(self): |
| """Test connection argument to send_mail(), et. al.""" |
| mail.outbox = [] |
| |
| # Send using non-default connection |
| connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') |
| send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) |
| self.assertEqual(mail.outbox, []) |
| self.assertEqual(len(connection.test_outbox), 1) |
| self.assertEqual(connection.test_outbox[0].subject, 'Subject') |
| |
| connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') |
| send_mass_mail([ |
| ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), |
| ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']), |
| ], connection=connection) |
| self.assertEqual(mail.outbox, []) |
| self.assertEqual(len(connection.test_outbox), 2) |
| self.assertEqual(connection.test_outbox[0].subject, 'Subject1') |
| self.assertEqual(connection.test_outbox[1].subject, 'Subject2') |
| |
| connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') |
| mail_admins('Admin message', 'Content', connection=connection) |
| self.assertEqual(mail.outbox, []) |
| self.assertEqual(len(connection.test_outbox), 1) |
| self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message') |
| |
| connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') |
| mail_managers('Manager message', 'Content', connection=connection) |
| self.assertEqual(mail.outbox, []) |
| self.assertEqual(len(connection.test_outbox), 1) |
| self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message') |
| |
| def test_dont_mangle_from_in_body(self): |
| # Regression for #13433 - Make sure that EmailMessage doesn't mangle |
| # 'From ' in message body. |
| email = EmailMessage('Subject', 'From the future', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| self.assertFalse('>From the future' in email.message().as_string()) |
| |
| |
| class BaseEmailBackendTests(object): |
| email_backend = None |
| |
| def setUp(self): |
| self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend) |
| |
| def tearDown(self): |
| restore_django_settings(self.__settings_state) |
| |
| def assertStartsWith(self, first, second): |
| if not first.startswith(second): |
| self.longMessage = True |
| self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.") |
| |
| def get_mailbox_content(self): |
| raise NotImplementedError |
| |
| def flush_mailbox(self): |
| raise NotImplementedError |
| |
| def get_the_message(self): |
| mailbox = self.get_mailbox_content() |
| self.assertEqual(len(mailbox), 1, |
| "Expected exactly one message, got %d.\n%r" % (len(mailbox), [ |
| m.as_string() for m in mailbox])) |
| return mailbox[0] |
| |
| def test_send(self): |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) |
| num_sent = mail.get_connection().send_messages([email]) |
| self.assertEqual(num_sent, 1) |
| message = self.get_the_message() |
| self.assertEqual(message["subject"], "Subject") |
| self.assertEqual(message.get_payload(), "Content") |
| self.assertEqual(message["from"], "from@example.com") |
| self.assertEqual(message.get_all("to"), ["to@example.com"]) |
| |
| def test_send_many(self): |
| email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com']) |
| email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com']) |
| num_sent = mail.get_connection().send_messages([email1, email2]) |
| self.assertEqual(num_sent, 2) |
| messages = self.get_mailbox_content() |
| self.assertEqual(len(messages), 2) |
| self.assertEqual(messages[0].get_payload(), "Content1") |
| self.assertEqual(messages[1].get_payload(), "Content2") |
| |
| def test_send_verbose_name(self): |
| email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>', |
| ["to@example.com"]) |
| email.send() |
| message = self.get_the_message() |
| self.assertEqual(message["subject"], "Subject") |
| self.assertEqual(message.get_payload(), "Content") |
| self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>") |
| |
| @with_django_settings(MANAGERS=[('nobody', 'nobody@example.com')]) |
| def test_html_mail_managers(self): |
| """Test html_message argument to mail_managers""" |
| mail_managers('Subject', 'Content', html_message='HTML Content') |
| message = self.get_the_message() |
| |
| self.assertEqual(message.get('subject'), '[Django] Subject') |
| self.assertEqual(message.get_all('to'), ['nobody@example.com']) |
| self.assertTrue(message.is_multipart()) |
| self.assertEqual(len(message.get_payload()), 2) |
| self.assertEqual(message.get_payload(0).get_payload(), 'Content') |
| self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') |
| self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') |
| self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') |
| |
| @with_django_settings(ADMINS=[('nobody', 'nobody@example.com')]) |
| def test_html_mail_admins(self): |
| """Test html_message argument to mail_admins """ |
| mail_admins('Subject', 'Content', html_message='HTML Content') |
| message = self.get_the_message() |
| |
| self.assertEqual(message.get('subject'), '[Django] Subject') |
| self.assertEqual(message.get_all('to'), ['nobody@example.com']) |
| self.assertTrue(message.is_multipart()) |
| self.assertEqual(len(message.get_payload()), 2) |
| self.assertEqual(message.get_payload(0).get_payload(), 'Content') |
| self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') |
| self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') |
| self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') |
| |
| @with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')], |
| MANAGERS=[('nobody', 'nobody+manager@example.com')]) |
| def test_manager_and_admin_mail_prefix(self): |
| """ |
| String prefix + lazy translated subject = bad output |
| Regression for #13494 |
| """ |
| mail_managers(ugettext_lazy('Subject'), 'Content') |
| message = self.get_the_message() |
| self.assertEqual(message.get('subject'), '[Django] Subject') |
| |
| self.flush_mailbox() |
| mail_admins(ugettext_lazy('Subject'), 'Content') |
| message = self.get_the_message() |
| self.assertEqual(message.get('subject'), '[Django] Subject') |
| |
| @with_django_settings(ADMINS=(), MANAGERS=()) |
| def test_empty_admins(self): |
| """ |
| Test that mail_admins/mail_managers doesn't connect to the mail server |
| if there are no recipients (#9383) |
| """ |
| mail_admins('hi', 'there') |
| self.assertEqual(self.get_mailbox_content(), []) |
| mail_managers('hi', 'there') |
| self.assertEqual(self.get_mailbox_content(), []) |
| |
| def test_message_cc_header(self): |
| """ |
| Regression test for #7722 |
| """ |
| email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) |
| mail.get_connection().send_messages([email]) |
| message = self.get_the_message() |
| self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ') |
| |
| def test_idn_send(self): |
| """ |
| Regression test for #14301 |
| """ |
| self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com'])) |
| message = self.get_the_message() |
| self.assertEqual(message.get('subject'), 'Subject') |
| self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') |
| self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') |
| |
| self.flush_mailbox() |
| m = EmailMessage('Subject', 'Content', 'from@öäü.com', |
| [u'to@öäü.com'], cc=[u'cc@öäü.com']) |
| m.send() |
| message = self.get_the_message() |
| self.assertEqual(message.get('subject'), 'Subject') |
| self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') |
| self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') |
| self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com') |
| |
| def test_recipient_without_domain(self): |
| """ |
| Regression test for #15042 |
| """ |
| self.assertTrue(send_mail("Subject", "Content", "tester", ["django"])) |
| message = self.get_the_message() |
| self.assertEqual(message.get('subject'), 'Subject') |
| self.assertEqual(message.get('from'), "tester") |
| self.assertEqual(message.get('to'), "django") |
| |
| |
| class LocmemBackendTests(BaseEmailBackendTests, TestCase): |
| email_backend = 'django.core.mail.backends.locmem.EmailBackend' |
| |
| def get_mailbox_content(self): |
| return [m.message() for m in mail.outbox] |
| |
| def flush_mailbox(self): |
| mail.outbox = [] |
| |
| def tearDown(self): |
| super(LocmemBackendTests, self).tearDown() |
| mail.outbox = [] |
| |
| def test_locmem_shared_messages(self): |
| """ |
| Make sure that the locmen backend populates the outbox. |
| """ |
| connection = locmem.EmailBackend() |
| connection2 = locmem.EmailBackend() |
| email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| connection.send_messages([email]) |
| connection2.send_messages([email]) |
| self.assertEqual(len(mail.outbox), 2) |
| |
| |
| class FileBackendTests(BaseEmailBackendTests, TestCase): |
| email_backend = 'django.core.mail.backends.filebased.EmailBackend' |
| |
| def setUp(self): |
| super(FileBackendTests, self).setUp() |
| self.tmp_dir = tempfile.mkdtemp() |
| self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir) |
| |
| def tearDown(self): |
| restore_django_settings(self.__settings_state) |
| shutil.rmtree(self.tmp_dir) |
| super(FileBackendTests, self).tearDown() |
| |
| def flush_mailbox(self): |
| for filename in os.listdir(self.tmp_dir): |
| os.unlink(os.path.join(self.tmp_dir, filename)) |
| |
| def get_mailbox_content(self): |
| messages = [] |
| for filename in os.listdir(self.tmp_dir): |
| session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n') |
| messages.extend(email.message_from_string(m) for m in session if m) |
| return messages |
| |
| def test_file_sessions(self): |
| """Make sure opening a connection creates a new file""" |
| msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) |
| connection = mail.get_connection() |
| connection.send_messages([msg]) |
| |
| self.assertEqual(len(os.listdir(self.tmp_dir)), 1) |
| message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0]))) |
| self.assertEqual(message.get_content_type(), 'text/plain') |
| self.assertEqual(message.get('subject'), 'Subject') |
| self.assertEqual(message.get('from'), 'from@example.com') |
| self.assertEqual(message.get('to'), 'to@example.com') |
| |
| connection2 = mail.get_connection() |
| connection2.send_messages([msg]) |
| self.assertEqual(len(os.listdir(self.tmp_dir)), 2) |
| |
| connection.send_messages([msg]) |
| self.assertEqual(len(os.listdir(self.tmp_dir)), 2) |
| |
| msg.connection = mail.get_connection() |
| self.assertTrue(connection.open()) |
| msg.send() |
| self.assertEqual(len(os.listdir(self.tmp_dir)), 3) |
| msg.send() |
| self.assertEqual(len(os.listdir(self.tmp_dir)), 3) |
| |
| |
| class ConsoleBackendTests(BaseEmailBackendTests, TestCase): |
| email_backend = 'django.core.mail.backends.console.EmailBackend' |
| |
| def setUp(self): |
| super(ConsoleBackendTests, self).setUp() |
| self.__stdout = sys.stdout |
| self.stream = sys.stdout = StringIO() |
| |
| def tearDown(self): |
| del self.stream |
| sys.stdout = self.__stdout |
| del self.__stdout |
| super(ConsoleBackendTests, self).tearDown() |
| |
| def flush_mailbox(self): |
| self.stream = sys.stdout = StringIO() |
| |
| def get_mailbox_content(self): |
| messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n') |
| return [email.message_from_string(m) for m in messages if m] |
| |
| def test_console_stream_kwarg(self): |
| """ |
| Test that the console backend can be pointed at an arbitrary stream. |
| """ |
| s = StringIO() |
| connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) |
| send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) |
| self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) |
| |
| |
| class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): |
| """ |
| Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: |
| http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| threading.Thread.__init__(self) |
| smtpd.SMTPServer.__init__(self, *args, **kwargs) |
| self._sink = [] |
| self.active = False |
| self.active_lock = threading.Lock() |
| self.sink_lock = threading.Lock() |
| |
| def process_message(self, peer, mailfrom, rcpttos, data): |
| m = email.message_from_string(data) |
| maddr = email.Utils.parseaddr(m.get('from'))[1] |
| if mailfrom != maddr: |
| return "553 '%s' != '%s'" % (mailfrom, maddr) |
| self.sink_lock.acquire() |
| self._sink.append(m) |
| self.sink_lock.release() |
| |
| def get_sink(self): |
| self.sink_lock.acquire() |
| try: |
| return self._sink[:] |
| finally: |
| self.sink_lock.release() |
| |
| def flush_sink(self): |
| self.sink_lock.acquire() |
| self._sink[:] = [] |
| self.sink_lock.release() |
| |
| def start(self): |
| assert not self.active |
| self.__flag = threading.Event() |
| threading.Thread.start(self) |
| self.__flag.wait() |
| |
| def run(self): |
| self.active = True |
| self.__flag.set() |
| while self.active and asyncore.socket_map: |
| self.active_lock.acquire() |
| asyncore.loop(timeout=0.1, count=1) |
| self.active_lock.release() |
| asyncore.close_all() |
| |
| def stop(self): |
| assert self.active |
| self.active = False |
| self.join() |
| |
| |
| class SMTPBackendTests(BaseEmailBackendTests, TestCase): |
| email_backend = 'django.core.mail.backends.smtp.EmailBackend' |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.server = FakeSMTPServer(('127.0.0.1', 0), None) |
| cls.settings = alter_django_settings( |
| EMAIL_HOST="127.0.0.1", |
| EMAIL_PORT=cls.server.socket.getsockname()[1]) |
| cls.server.start() |
| |
| @classmethod |
| def tearDownClass(cls): |
| cls.server.stop() |
| |
| def setUp(self): |
| super(SMTPBackendTests, self).setUp() |
| self.server.flush_sink() |
| |
| def tearDown(self): |
| self.server.flush_sink() |
| super(SMTPBackendTests, self).tearDown() |
| |
| def flush_mailbox(self): |
| self.server.flush_sink() |
| |
| def get_mailbox_content(self): |
| return self.server.get_sink() |