|
|
- import mysql.connector
- import re
- import requests
- import urllib.parse
- from datetime import datetime
- import imaplib
- from email.parser import BytesParser
- from email.utils import getaddresses
- import AI
- import jsParser
- from pynotifier import Notification
- import platform
- from plyer import notification
- from urllib.parse import urlparse
- import pickle
- import numpy
- import yandex_search
- from bs4 import BeautifulSoup
-
- MAILS_TO_CACHE = 5
-
- SKIP = ["facebook.com","w3.org"]
- class Logger:
- def __init__(self, filename):
- self.file = open(filename, "a+")
-
- def error(self, msg: str):
- self.file.write("[ERROR] " + self.generate_log(msg))
- self.file.flush()
-
- def debug(self, msg):
- self.file.write("[DEBUG] " + self.generate_log(msg))
- self.file.flush()
-
- def info(self, msg: str):
- self.file.write("[INFO] " + self.generate_log(msg))
- self.file.flush()
-
- def generate_log(self, msg: str):
- now = datetime.now()
- timestamp = now.strftime("%b %d %Y %H:%M:%S")
- return timestamp + " : " + msg + "\n"
-
- def stop(self):
- self.file.close()
-
-
- class MailBox:
-
- def __init__(self, email: str, passwd: str, server: str, port: int, mysql_creds: dict, sensitivity: int,
- threshold: int, logger: Logger):
- try:
- self.log = logger
- self.log.info("Started MailBox listener for " + email)
- self.FROM_EMAIL = email
- self.FROM_PWD = passwd
- self.SMTP_SERVER = server
- self.SMTP_PORT = port
- self.imap = imaplib.IMAP4_SSL(self.SMTP_SERVER, self.SMTP_PORT)
- self.imap.login(self.FROM_EMAIL, self.FROM_PWD)
- self.imap.select('INBOX')
- self.mysql_creds = mysql_creds
- self.threshold = threshold
- self.sensitivity = sensitivity
- self.get_ids()
- # The mails dictionary format it <mail_id>:(<Mail_Object>,<Modified time>)
- self.mails = {}
- self.mysql = mysql_creds
- self.spam_folder = self.detect_spam_folder()
- self.get_new_mails()
- except Exception as e:
- self.log.error(str(e))
-
- def get_ids(self):
- try:
- self.imap.select("INBOX", False)
- typ, ids = self.imap.uid('search', None, 'ALL')
- self.ids = ids[0].decode().split()
- self.log.debug(str(self.ids))
- except Exception as e:
- self.log.error(str(e))
- self.imap = imaplib.IMAP4_SSL(self.SMTP_SERVER, self.SMTP_PORT)
- self.imap.login(self.ORG_EMAIL, self.FROM_PWD)
- self.imap.select('INBOX', False)
- self.get_ids()
-
- def get_new_mails(self):
- mysql_db = mysql.connector.connect(
- user=self.mysql_creds["mysql_username"],
- password=self.mysql_creds["mysql_password"],
- database=self.mysql_creds["mysql_database"],
- host=self.mysql_creds["mysql_host"]
- )
- cursor = mysql_db.cursor()
- while len(self.mails) <= MAILS_TO_CACHE:
- id = self.ids[-1 - len(self.mails)]
- cursor.execute(
- "SELECT mail_id FROM logs WHERE mail_id = '{}' AND account = '{}'".format(id, self.FROM_EMAIL))
- cursor.fetchall()
- if cursor.rowcount > 0:
- self.mails[id] = "[BUFFERED MAIL]"
- continue
- typ, messageRaw = self.imap.uid('fetch', id, '(RFC822)')
- self.log.info("Found mail with id: " + str(id))
- email = messageRaw[0][1]
- self.mails[id] = Mail(email, self.mysql_creds, self.threshold, self.sensitivity, self.FROM_EMAIL, self.log,
- id, self.spam_folder)
- self.mails[id].check_spam()
- self.log.info("Checked new mail with id: " + str(id))
-
- def refresh_new_mails(self):
- self.log.info("Refreshing mails")
- old_ids = self.ids
- self.get_ids()
- #self.log.debug(str(self.ids))
- diff_ids = set(self.ids) - set(old_ids)
- for id in diff_ids:
- self.log.info("Found new mail with id: " + str(id))
- typ, messageRaw = self.imap.uid('fetch', id, '(RFC822)')
- self.log.info("Fetched the new mail")
- email = messageRaw[0][1]
- self.mails[id] = Mail(email, self.mysql_creds, self.threshold, self.sensitivity, self.FROM_EMAIL, self.log,
- id, self.spam_folder)
- self.mails[id].check_spam()
-
- def check_spam(self):
- for mail_name in self.mails:
- mail_obj = self.mails[mail_name]
- mail_obj.check_spam()
- self.log.info("Checked new mail with id: " + str(mail_name))
- if mail_obj.get_spam() == 1:
- self.log.info("Found spam mail sent by: " + mail_obj.header_data["From"].split("<")[1][:-1])
- result = self.imap.uid('COPY', mail_obj.mail_id, mail_obj.spam_folder)
- if result[0] == 'OK':
- mov, data = self.imap.uid('STORE', mail_obj.mail_id, '+FLAGS', '(\Deleted)')
- self.imap.expunge()
-
- def detect_spam_folder(self):
- folder = ""
- for i in self.imap.list()[1]:
- l = i.decode().split(' "/" ')
- if "Junk" in l[0]:
- folder = l[1]
- break
- if "Trash" in l[0]:
- folder = l[1]
- if folder[0] == "\"":
- folder = folder[1:]
- if folder[-1] == "\"":
- folder = folder[:-1]
- return folder
-
-
- class Mail:
-
- def __init__(self, mail_data, mysql_creds, threshold, sensitivity, account, logger, mail_id, spam_folder):
- self.JS_IMPORT_REGEX = r'/<script.*(?:src="(.*)").*>/s'
- self.JS_EXTRACT_REGEX = r'/<script.*>(.*?)<\/script>/s'
- self.URL_REGEX = "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|[^\x00-\x7F]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
- self.parser = BytesParser()
- self.sensitivity = sensitivity
- self.threshold = threshold
- self.log = logger
- self.spam_folder = spam_folder
- self.mysql_db = mysql.connector.connect(
- user=mysql_creds["mysql_username"],
- password=mysql_creds["mysql_password"],
- database=mysql_creds["mysql_database"],
- host=mysql_creds["mysql_host"]
- )
- self.account = account
- self.spam_points = 0
- self.js_code = {}
- self.urls_in_document = []
- self.documents = {}
- self.mail_id = mail_id
- # The headers are defined as <key>:<to_remove_from key>
- # -1 is used to define the last header, after that comes the mail contents
- self.whitelisted = False
- self.blacklisted = False
- self.parsed_mail = self.parser.parsebytes(mail_data)
- self.header_data = dict(self.parsed_mail)
- self.message = ""
- self.extract_message()
- self._spam = -1
- self.check_whitelist()
- self.check_blacklisted()
- self.urls = re.findall(self.URL_REGEX, self.message)
- for i in range(len(self.urls)):
- self.urls[i] = self.urls[i].strip()
-
- def add_domain_to_blacklist(self,url):
- domain = urlparse(url).hostname
- cursor = self.mysql_db.cursor()
- cursor.execute("INSERT INTO new_blacklists(domain) VALUES('{}')".format(domain.encode("idna").decode("utf-8")))
- cursor.execute("INSERT INTO domain_blacklist(domain) VALUES('{}')".format(domain.encode("idna").decode("utf-8")))
-
- def extract_message(self):
- if self.parsed_mail.is_multipart():
- for i in self.parsed_mail.get_payload():
- payload = i.get_payload(decode=True)
- try:
- self.message += payload.decode("utf-8")
- except AttributeError as e:
- self.log.error("AttributeError while trying to get message from mail with id " + str(self.mail_id))
- print(e)
- except UnicodeDecodeError as e:
- self.log.error(
- "UnicodeDecodeError while trying to get message from mail with id " + str(self.mail_id))
- print(e)
- else:
- payload = self.parsed_mail.get_payload(decode=True)
- try:
- self.message += payload.decode("utf-8")
- except AttributeError as e:
- self.log.error("AttributeError while trying to get message from mail with id " + str(self.mail_id))
- print(e)
- except UnicodeDecodeError as e:
- self.log.error("UnicodeDecodeError while trying to get message from mail with id " + str(self.mail_id))
- print(e)
-
- def check_blacklisted(self, url=None):
- if url != None:
- url = url.encode("idna").decode("utf-8")
- cursor = self.mysql_db.cursor()
- if not url == None:
- cursor.execute("SELECT * FROM domain_blacklist WHERE domain LIKE '{}';".format(url))
- cursor.fetchall()
- if cursor.rowcount > 0:
- cursor.close()
- return True
- return False
- mail_header = self.header_data["From"].split("<")[1][:-1]
- mail = mail_header
- cursor.execute("SELECT * FROM mail_blacklist WHERE mail='{}';".format(mail))
- cursor.fetchall()
- if cursor.rowcount >= 1:
- print("Blacklisted")
- self.blacklisted = True
-
- def check_whitelist(self, url=None):
- if url != None:
- url = url.encode("idna").decode("utf-8")
- cursor = self.mysql_db.cursor()
- if not url == None:
- cursor.execute("SELECT * FROM domain_whitelist WHERE domain LIKE '%{}%';".format(url))
- cursor.fetchall()
- if cursor.rowcount > 0:
- cursor.close()
- return True
- return False
- mail_header = self.header_data["From"].split("<")[1][:-1]
- mail = mail_header
- cursor.execute("SELECT * FROM mail_whitelist WHERE mail='{}';".format(mail))
- cursor.fetchall()
- if cursor.rowcount >= 1:
- self.whitelisted = True
-
- def check_special_chars(self):
- for url in self.urls:
- parsed = urllib.parse.urlparse(url.encode("idna").decode("utf-8").encode("utf-8").decode("idna"))
- special_char_count = 0
- for char in parsed.netloc:
- if not char == ".":
- if not char.encode("utf-8") == char.encode("idna"):
- print("Special char detected")
- self._spam = 1
-
- def aiPredict(self,data):
- with open("aiModel","rb") as m:
- aiModel = pickle.load(m)
- ai_in = (data["dir_num"], data["index_num"], data["length"], data["out_resources"], data["robots_entries"], data["special_char_num"], data["subdomain_len"], data["subdomain_num"], data["tld_trust"])
- return aiModel.predict(numpy.reshape(ai_in,(1, 9)))
-
- def find_list_resources (self,tag, attribute,soup):
- list = []
- for x in soup.findAll(tag):
- try:
- list.append(x[attribute])
- except KeyError:
- pass
- return(list)
-
- def get_url_data(self,url,yandex,timeout=30):
- data = {}
- data["length"] = (len(url.split("://")[1].split("?")[0]))
- data["dir_num"] = (url.find("/")-2)
- parsed = urlparse(url.encode("idna").decode("utf-8").encode("utf-8").decode("idna"))
- hostname_split = parsed.hostname.split(".")
- data["tld_trust"] = int(hostname_split[-1].lower() in ["com", "org", "net"])
- data["subdomain_num"] = len(hostname_split) - 2
- data["subdomain_len"] = len("".join(hostname_split[:-2]))
- special_char_count = 0
- for char in parsed.hostname:
- if char == ".":
- continue
- if not char.encode("utf-8") == char.encode("idna"):
- special_char_count += 1
- data["special_char_num"] = special_char_count
- #Advanced data extraction
- try:
- data["index_num"] = int(yandex.search("site:{}".format(parsed.hostname)).found["all"])
- except yandex_search.NoResultsException:
- data["index_num"] = 0
- robot_entry_counter = 0
- try:
- response = requests.get("{}://{}/robots.txt".format(parsed.scheme, parsed.netloc), allow_redirects=True, verify=False, timeout=timeout)
- if response.status_code == 200:
- lines = response.text.split("\n")
- lines = [x for x in lines if x != ""]
- robot_entry_counter += len([x for x in lines if x[0] != "#"])
- else:
- pass
- except Exception as e:
- print(e)
- data["robots_entries"] = robot_entry_counter
- try:
- req = requests.get(url, verify=False, timeout=timeout)
- if req.status_code == 200:
- soup = BeautifulSoup(req.text,'html.parser')
- image_scr = self.find_list_resources('img',"src",soup)
- script_src = self.find_list_resources('script',"src",soup)
- css_link = self.find_list_resources("link","href",soup)
- all_links = image_scr + css_link + script_src
- out_links = []
- for link in all_links:
- parsed_link = urlparse(link)
- if parsed_link.hostname != parsed.hostname:
- out_links.append(link)
- data["out_resources"] = len(out_links)
- else:
- data["out_resources"] = -1
- except Exception as e:
- print(e)
- data["out_resources"] = -1
- return data
-
- def check_url(self, url):
- yandex = yandex_search.Yandex(api_user='raporcubaba@gmail.com', api_key='03.1042294429:b8e679f9acadef49ebab0d9726ccef58')
- data = self.get_url_data(url,yandex,timeout=10)
- if self.aiPredict(data):
- self.add_domain_to_blacklist(url)
- self.spam_points += self.sensitivity
-
- def check_js(self):
- for url in self.js_code:
- for js in self.js_code[url]:
- if self.check_blacklisted(url=js):
- self._spam = 1
- if self.check_js_code(self.js_code[js]):
- self.add_domain_to_blacklist(url)
- self.spam_points += self.sensitivity
-
- def check_disallowed_chars(self, url_start, chars=["<", ">", "'", "\""]):
- url = url_start
- for char in chars:
- if char in url:
- return True
- if urllib.parse.quote_plus(char) in url:
- return True
- if urllib.parse.quote_plus(urllib.parse.quote_plus(char)) in url:
- return True
- return False
-
- def check_tld(self,url):
- if urlparse(url).hostname.split(".")[-1] in ["info","tk","gq"]:
- self._spam = 1
-
- def check_domain_name(self,url):
- pass # TODO fill this up
-
- def discover_url(self,urls):
- for url in urls:
- foo = False
- for i in self.documents:
- if urlparse(url).hostname == urlparse(i).hostname:
- foo = True
- if foo:
- continue
- while 1:
- if self.check_whitelist(url=url):
- break
- if self.check_blacklisted(url=url):
- self._spam = 1
- return
- self.log.info(url)
- self.check_disallowed_chars(url)
- self.keyword_search(url)
- self.check_xss(url)
- self.check_url(url)
- self.check_tld(url)
- self.check_domain_name(url)
- try:
- r = requests.get(url, allow_redirects=False)
- except requests.exceptions.ConnectionError:
- break
- #detect all status codes in the format 3xx
- try:
- self.documents[url] = r.content.decode()
- for i in self.extract_urls(self.documents[url]):
- foo = False
- for i in self.documents:
- if urlparse(url).hostname == urlparse(i).hostname:
- foo = True
- if foo:
- continue
- skip = False
- for j in SKIP:
- if j in i:
- skip = True
- if skip:
- continue
- foo = []
- foo.append(i.strip())
- self.discover_url(foo)
- except UnicodeDecodeError:
- pass
- if r.status_code == 302 or r.status_code == 303:
- location = r.headers["location"]
- if location.startswith("http"):
- url = location
- else:
- url = "/".join(url.split("/")[:-1]) + location
- continue
- else:
- break
-
- def keyword_search(self, url):
- keywords = self.mysql_db.cursor()
- keywords.execute("SELECT * FROM keywordlist;")
- result = keywords.fetchall()
- for row in result:
- if row[0] in url:
- if ".".join(urlparse(url).hostname.split(".")[-2:]) != row[1]:
- self._spam = 1
-
- def check_xss(self, url):
- malicious = self.check_disallowed_chars(url)
- if malicious:
- print("ADDED SPAM POINTS")
- self.spam_points += self.sensitivity
-
- #TODO add deobfuscation
- def extract_javascript(self): #TODO not working
- p = re.compile(self.JS_IMPORT_REGEX)
- for doc in self.documents:
- self.js_code[doc] = []
- for url in p.findall(self.documents[doc]):
- r = requests.get(url, allow_redirects=False)
- if r.status_code == 200:
- self.js_code[doc].append(r.content)
- p = re.compile(self.JS_EXTRACT_REGEX)
- for js in p.findall(doc):
- if js != "":
- self.js_code[doc].append(js)
-
- def extract_urls(self, doc): # Extract URLs from the documents and save them to the array urls_in_document
- p = re.compile(self.URL_REGEX)
- res = p.findall(doc)
- return res
-
- def check_stored_xss(self):
- self.extract_javascript()
- for url in self.js_code:
- url_spam_count = 0
- for js in self.js_code[url]:
- if url_spam_count > 6:
- self.add_domain_to_blacklist(url)
- if self.check_blacklisted(url=url):
- self._spam = 1
- return
- if self.check_js_code(js):
- url_spam_count += 3
- self.spam_points += self.sensitivity
-
- def check_js_code(self, code):
- parsedJs = jsParser.parseJavascript(code, False)
- return AI.aiPredict(parsedJs)
-
- def log_mail(self):
- cursor = self.mysql_db.cursor()
- domain = getaddresses(self.parsed_mail.get_all('from', []))[0][1]
- cursor.execute(
- "INSERT INTO logs (sender_domain,result, account, mail_id) VALUES ('{}','{}','{}','{}')".format(domain,
- self.get_spam(),
- self.account,
- self.mail_id))
- self.mysql_db.commit()
- cursor.close()
-
- def check_spam(self):
- if self.whitelisted:
- self._spam = 0
- self.log_mail()
- return
- elif self.blacklisted:
- self._spam = 1
- else:
- self.discover_url(self.urls)
-
- self.check_stored_xss()
- if self.threshold < self.spam_points:
- self._spam = 1
- if self.get_spam() == 1:
- self.log.info("Mail moved to spam with id: " + str(self.mail_id))
- if platform.system() == "Windows":
- notification.notify(
- title='Found spam mail by: ' + getaddresses(self.parsed_mail.get_all('from', []))[0][1],
- message=self.account,
- app_icon=None,
- timeout=10, )
- else:
- Notification(
- title='Found spam mail by: ' + getaddresses(self.parsed_mail.get_all('from', []))[0][1],
- description=self.account,
- duration=10,
- urgency=Notification.URGENCY_CRITICAL
- ).send()
- self.log_mail()
-
- def get_spam(self) -> int:
- if self.whitelisted:
- return 0
- return self._spam
|