#!/usr/bin/env python
#
### WARNING: this script requires the dnspython project to be installed.
### visit http://www.dnspython.org for downloads.
#
# Very simple script to help remove bad email addresses from a given list.
# It will stuff all culled addresses into a separate file for manual inspection
# with a quick note why it was culled.
#
# Input to this script will come in the form of a CSV file separated by tabs.
# Field 1 will be the email address, field 2 will be the first name and field 3
# will be the last name.
#
# Output format of the culledList will also be CSV w/ tab separation. Field 1
# will be the rejected address (last name, first name), and field 2 will be
# the reason for rejection. (just a simple text string.)
#
# We use a couple of basic techniques. The first is to translate all characters to
# lower case in the domain part of the address. Second, offline regex parsing to
# cull the stupidly bad email addresses. Third, I will check to see if I can even
# find an MX record for the domain. Fourth, it will try to connect to the MX server
# and fifth attempt to send a quick reminder message if you specify a message file.
#
# A possible future expansion would be to try addresses that failed at part 5 with
# the username portion in all lower case. (RFC 2821 states all recipient parts of
# an address must be treated as case sensitive, but not all servers do.)
# Another future expansion would be to embed a simple POP/IMAP client that can
# check a specific address to look for bounce/spam/other block type messages and
# further cull the list. Maybe that could be a separate program?? The
# possibilities are almost unbounded. :)
#
# A further expansion would involve certain black-listed domains like pookmail,
# mailinator, etc. to help further resolve the issue. No point in sending spam
# to people who won't get it.
#
# an immediately desirable expansion is caching the results of domain tests. eg:
# if we test a domain, say, aol.com, we cache that result so we don't waste
# precious thread time checking it again.
__author__ = "Nick Guy & Brian Guy"
__license__ = "GPL"
import sys,os,string,re,csv, smtplib, socket
from optparse import OptionParser
try:
import dns.resolver
except:
print "You need the DNS Python library from http://www.dnspython.org"
# Simple open() wrapper that checks for problems. Feel free to extend
# to check permissions, etc.
def openFile( fileName, mode, retCode ):
try:
infile = open( fileName, mode )
except IOError:
fileMode = ""
if mode == 'w':
fileMode = "writing"
elif mode == 'r':
fileMode = "reading"
else:
fileMode = "special I/O"
print "Can't open " + fileName + " for " + fileMode + "."
sys.exit(retCode)
# give back the file handle.
return infile
# Here we look to see if we can query a nameserver to get the MX
# For a domain. If we see any error, we can safely assume the domain
# listed is problematic without further explanation.
def checkDomain( domain ):
try:
# Go with the first answer from the name server.
# Future enhancement: determine which is primary MX
# and go with that one.
answers = dns.resolver.query(domain, 'MX')
return str(answers[0]).split()[1] # python rocks right here.
except:
return False
# Check here if the MX resolves to an IP. Any exceptions should
# signify it's not worth further checking out.
def checkMXResolve( mx ):
try:
answers = dns.resolver.query(mx, 'A')
return str(answers[0])
except:
return False
# Simply checks to see if we get a valid (250) response code from
# the MX host. If not, safely assume something is wrong and return
# false. Note that this may be obviated by the code to fully send
# a message.
def checkMXHelo( mxip ):
try:
server = smtplib.SMTP(mxip)
[code, response] = server.helo(socket.gethostname())
if( code == 250 ):
return True
else:
return False
except:
return False
# Function driver for all network based tests. Separating it out
# should make it an ideal pthread entry point. Note that since
# these tests are linear and dependent on prior results, finer thread
# granulation isn't recommended. Too much work for too little benefit.
def netTest( address ):
(username, domain) = address.split('@', 2)
# Have we seen this domain before?
if( domainCache.has_key(str(domain)) ):
return domainCache[str(domain)]
mx = checkDomain( domain )
if( False == mx ):
domainCache[str(domain)] = errorMessages[6]
if( doVerbose == True ):
print domain + ": " + errorMessages[6]
return(errorMessages[6])
mxip = checkMXResolve( mx )
if( False == mxip ):
domainCache[str(domain)] = errorMessages[7]
if( doVerbose == True ):
print domain + ": " + errorMessages[7]
return(errorMessages[7])
heloCode = checkMXHelo( mxip )
if( False == heloCode ):
domainCache[str(domain)] = errorMessages[8]
if( doVerbose == True ):
print domain + ": " + errorMessages[8]
return(errorMessages[8])
### Cascade additional network-based tests here.
domainCache[str(domain)] = True
return True
# file handles and other file-scope stuff
listFile = False
culledFile = False
outFile = False
testFile = False
# Option holders
doNetTests = False
doTestMessage = False
doVerbose = False
# address containers
culledAddys = []
goodAddys = []
domainCache = {}
addyCache = {}
# Regex's for offline address examination
regat = re.compile('@')
regmultat = re.compile('@.*@')
regnoaddr = re.compile('^\s*@')
regdomain = re.compile('@[^.]+\.[^.]+')
regscrub = re.compile('[^@]+$')
# Error message listing
errorMessages = [ "duplicate address..", "Multiple @ symbols", "Missing @ symbol", "FCC regulated domain (or otherwise blacklisted domain)", "Missing username", "Missing domain", "Domain either doesn't exist or no MX is listed.", "Domain MX doesn't resolve", "Non-responsive MX", "Validated via network test", "Validated via offline tests only." ]
# Try to open the FCC scrub list (these are domains we CANNOT spam to legally.)
scrubFile = openFile('scrublist.txt', 'r', 5)
scrubItems = map(string.strip, scrubFile.readlines())
scrubFile.close()
# build the command line option parser. Note that anything you expect to have
# an option culled = Trueed to, do NOT set 'default=False|true'. This seems to corrupt
# OptionParser() in a way that makes two options (or more) share the same
# argument. Dunno why. Also, -h|--help is autogenerated ftw.
parser = OptionParser()
parser.add_option("-l", "--listfile", dest="listFileName", help="Name of file containing email addresses. If not specified, stdin is used.", metavar="FILE")
parser.add_option("-c", "--culledfile", dest="culledFileName", help="Name of file to write bad addresses to. If not specified, stderr is used.", metavar="FILE")
parser.add_option("-o", "--outfile", dest="outFileName", help="Name of file to write good addresses to. If not specified, stdout is used.", metavar="FILE")
parser.add_option("-t", "--testmessage", dest="testFileName", help="Path to the test message to send as part of the diagnostic. Implies --network.", metavar="FILE")
parser.add_option("-n", "--network", action="store_true", help="Enable non-spammy network-based tests.", default=False)
parser.add_option("-v", "--verbose", action="store_true", help="Show verbose output while running tests.", default=False)
(options, args) = parser.parse_args()
# Validate command line input. Check files to be readable/writable accordingly.
if ( options.listFileName != None ):
listFile = openFile( options.listFileName, 'rb', 1 )
else:
listFile = sys.stdin
if( options.culledFileName != None ):
culledFile = openFile( options.culledFileName, 'wb', 2 )
else:
culledFile = sys.stderr
if( options.outFileName != None ):
outFile = openFile( options.outFileName, 'wb', 3 )
else:
outFile = sys.stdout
if( options.network != False ):
doNetTests = True
if( options.verbose != False ):
doVerbose = True
if( options.testFileName != None ):
testFile = openFile( options.testFileName, 'rb', 4 )
# read in email addresses.
addressReader = csv.reader(listFile, delimiter="\t")
# main processing loop. Consider splitting this into the offline and online
# loops so you can thread the online loop.
for line in addressReader:
address = line[0]
# this is used to test presence of a single @ symbol.
parts = address.split('@',2)
# Have we seen this address before? Check the cache.
if ( addyCache.has_key( str(address) ) ):
culledAddys.append(line + [errorMessages[0]])
if( doVerbose == True ):
print str(address) + ": " + errorMessages[0]
# First process all the regex type offline stuff.
# Note that this must come first for @@+ to not fail after the second test.
elif (regmultat.search(address)):
culledAddys.append(line + [errorMessages[1]])
addyCache[str(address)] = errorMessages[1]
if( doVerbose == True ):
print str(line) + ": " + errorMessages[1]
elif (len(parts) != 2):
addyCache[str(address)] = errorMessages[2]
culledAddys.append(line + [errorMessages[2]])
if( doVerbose == True ):
print str(line) + ": " + errorMessages[2]
elif(parts[1] in scrubItems):
addyCache[str(address)] = errorMessages[3]
culledAddys.append(line + [errorMessages[3]])
if( doVerbose == True ):
print str(line) + ": " + errorMessages[3]
elif (regnoaddr.search(address)):
addyCache[str(address)] = errorMessages[4]
culledAddys.append(line + [errorMessages[4]])
if( doVerbose == True ):
print str(line) + ": " + errorMessages[4]
elif not (regdomain.search(address)):
addyCache[str(address)] = errorMessages[5]
culledAddys.append(line + [errorMessages[5]])
if( doVerbose == True ):
print str(line) + ": " + errorMessages[5]
### Place additional regexes here.. ###
# Network based tests: existing domain and connecting to MX.
elif( doNetTests == True ):
result = netTest(address)
if( result == True ):
addyCache[str(address)] = errorMessages[9]
goodAddys.append(line + [errorMessages[9]])
if( doVerbose == True ):
print str(line) + ": " + errorMessages[9]
else:
culledAddys.append(line + [result])
addyCache[str(address)] = result
if( doVerbose == True ):
print str(line) + ": " + result
### This might be a good point to differentiate between unobtrusive
### Network tests and the full test-message version.
else:
goodAddys.append(line + [errorMessages[10]])
addyCache[str(address)] = errorMessages[10]
if( doVerbose == True ):
print str(line) + ": " + errorMessages[10]
# Write out CSV's for valid and culled email addresses.
culledWriter = csv.writer(culledFile, delimiter="\t")
outWriter = csv.writer(outFile, delimiter="\t")
culledWriter.writerows(culledAddys)
outWriter.writerows(goodAddys)
sys.exit(0)