Python工具
2021-02-17 12:20
标签:调试 构造 char main oda ipa 文件 服务器端 poplib Python工具 标签:调试 构造 char main oda ipa 文件 服务器端 poplib 原文地址:https://www.cnblogs.com/xzqpy/p/12697575.html邮箱首发
import os,sys
import smtplib
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
from backend import core
import os
import poplib,email,telnetlib
import datetime,time,sys,traceback
from email.parser import Parser
from email.header import decode_header
from email.utils import parseaddr
import logging
class down_email():
def __init__(self,user,password,eamil_server):
# 输入邮件地址, 口令和POP3服务器地址:
self.user = user
# 此处密码是授权码,用于登录第三方邮件客户端
self.password = password
self.pop3_server = eamil_server
# 获得msg的编码
def guess_charset(self,msg):
charset = msg.get_charset()
if charset is None:
content_type = msg.get(‘Content-Type‘, ‘‘).lower()
pos = content_type.find(‘charset=‘)
if pos >= 0:
charset = content_type[pos + 8:].strip()
return charset
#获取邮件内容
def get_content(self,msg):
content=‘‘
content_type = msg.get_content_type()
# print(‘content_type:‘,content_type)
if content_type == ‘text/plain‘: # or content_type == ‘text/html‘
content = msg.get_payload(decode=True)
charset = self.guess_charset(msg)
if charset:
content = content.decode(charset)
return content
# 字符编码转换
# @staticmethod
def decode_str(self,str_in):
value, charset = decode_header(str_in)[0]
if charset:
value = value.decode(charset)
return value
# 解析邮件,获取附件
def get_att(self,msg_in, str_day,filename1,path):
attachment_files = []
for part in msg_in.walk():
# 获取附件名称类型
file_name = part.get_param("name") #如果是附件,这里就会取出附件的文件名
# file_name = part.get_filename() #获取file_name的第2中方法
# contType = part.get_content_type()
if file_name:
h = email.header.Header(file_name)
# 对附件名称进行解码
dh = email.header.decode_header(h)
filename = dh[0][0]
if dh[0][1]:
# 将附件名称可读化
filename = self.decode_str(str(filename, dh[0][1]))
# print(filename)
# filename = filename.encode("utf-8")
# 下载附件
data = part.get_payload(decode=True)
# 在指定目录下创建文件,注意二进制文件需要用wb模式打开
filename_ok=filename.replace(‘zip‘,‘ok‘)
if os.path.exists(path+filename_ok):
print(‘文件已存在‘)
break
print("写入的文件路径",path+‘/‘+filename)
att_file = open(path + filename, ‘wb‘)
att_file.write(data) # 保存附件
att_file.close()
attachment_files.append(filename)
else:
# 不是附件,是文本内容
print(self.get_content(part))
# # 如果ture的话内容是没用的
# if not part.is_multipart():
# # 解码出文本内容,直接输出来就可以了。
# print(part.get_payload(decode=True).decode(‘utf-8‘))
return attachment_files
def send_mail(self ,msg_obj,to_addr_list):
try:
# smtp协议的默认端口是25,QQ邮箱smtp服务器端口是465,第一个参数是smtp服务器地址,第二个参数是端口,第三个参数是超时设置,这里必须使用ssl证书,要不链接不上服务器
server = smtplib.SMTP_SSL(self.pop3_server, 465, timeout=2)
# 登录邮箱
server.login(self.user, self.password)
# 发送邮件,第一个参数是发送方地址,第二个参数是接收方列表,列表中可以有多个接收方地址,表示发送给多个邮箱,msg.as_string()将MIMEText对象转化成文本
server.sendmail(self.user, to_addr_list, msg_obj.as_string())
server.quit()
print("success")
except Exception as e:
logging.exception(e)
print(‘Faild:%s‘)
def run_ing(self,path):
str_day = str(datetime.date.today())# 日期赋值
# 连接到POP3服务器,有些邮箱服务器需要ssl加密,可以使用poplib.POP3_SSL
try:
telnetlib.Telnet(self.pop3_server, 995)
self.server = poplib.POP3_SSL(self.pop3_server, 995, timeout=10)
except:
time.sleep(5)
self.server = poplib.POP3(self.pop3_server, 110, timeout=10)
# server.set_debuglevel(1) # 可以打开或关闭调试信息
# 打印POP3服务器的欢迎文字:
print("?",self.server.getwelcome().decode(‘utf-8‘))
# 身份认证:
self.server.user(self.user)
self.server.pass_(self.password)
# 返回邮件数量和占用空间:
# list()返回所有邮件的编号:
resp, mails, octets = self.server.list()
# 可以查看返回的列表类似[b‘1 82923‘, b‘2 2184‘, ...]
index = len(mails)
for i in range(index, 0, -1):# 倒序遍历邮件
# for i in range(1, index + 1):# 顺序遍历邮件
resp, lines, octets = self.server.retr(i)
# lines存储了邮件的原始文本的每一行,
# 邮件的原始文本:
msg_content = b‘\r\n‘.join(lines).decode(‘gbk‘)
# 解析邮件:
msg = Parser().parsestr(msg_content)
#获取邮件的发件人,收件人, 抄送人,主题
# hdr, addr = parseaddr(msg.get(‘From‘))
# From = self.decode_str(hdr)
# hdr, addr = parseaddr(msg.get(‘To‘))
# To = self.decode_str(hdr)
# 方法2:from or Form均可
From = parseaddr(msg.get(‘from‘))[1]
To = parseaddr(msg.get(‘To‘))[1]
Cc=parseaddr(msg.get_all(‘Cc‘))[1]# 抄送人
Subject = self.decode_str(msg.get(‘Subject‘))
print(‘from:%s,to:%s,Cc:%s,subject:%s‘%(From,To,Cc,Subject))
# 获取邮件时间,格式化收件时间
date1 = time.strptime(msg.get("Date")[0:24], ‘%a, %d %b %Y %H:%M:%S‘)
# 邮件时间格式转换
date2 = time.strftime("%Y-%m-%d",date1)
if date2 ",attach_file)
# 可以根据邮件索引号直接从服务器删除邮件:
# self.server.dele(7)
self.server.quit()
def analysis_file(have_zip_path):
import subprocess
dir_list=subprocess.getoutput(f‘find {have_zip_path} -type f -iname "*.zip" ‘) #寻找这个路径下所有的zip文件
# dir_list=subprocess.getoutput(f‘unzip {dir_list} ‘)
dir_list=dir_list.split(‘\n‘)
dir_list=[i for i in dir_list if i ]
for file_path in dir_list: #/data/loop/ada.zip
#对这个文件解压大 loop
unzip_path=file_path.replace(‘loophole‘,‘loophole_dir‘)
unzip_path=unzip_path.replace(‘.zip‘,‘‘)#创建一个 /data/loop_dir/ada 目录
print("文件路径:"+file_path)
print("解压路径:"+unzip_path)
if not os.path.exists(unzip_path):
os.mkdir(unzip_path)
subprocess.getoutput(f‘unzip -o {file_path} -d {unzip_path} ‘) #把zip 文件 解压到 /data/loop_dir/ada下
print(‘解压成功‘)
#打开 /data/loophole_dir/ada/index.html
f=open(f‘{unzip_path}/index.html‘)
index_str=f.read()
f.close()
detail_host=core.process_loop(index_str,unzip_path)
file_ok_path=file_path.replace(‘zip‘,‘ok‘) #替换成ok
subprocess.getoutput(f‘mv {file_path} {file_ok_path} ‘)
try:
core.process_loop_data(detail_host) #数据入库
except Exception as f :
logging.error(‘严重错误 :解析失败‘)
logging.exception(f)
def give_email(path):
‘‘‘下载邮箱附件‘‘‘
try:
# 输入邮件地址, 口令和POP3服务器地址:
# path = ‘test/‘
from_addr = ‘yuno_upport@yunochina.net‘
password = ‘kWFb463ccU54dwgr‘
eamil_server = ‘smtp.exmail.qq.com‘
email_class = down_email(user=from_addr, password=password, eamil_server=eamil_server)
email_class.run_ing(path)
return email_class
except Exception as e:
logging.error(‘严重错误 : 获取邮件错误‘)
logging.exception(e)
pass
def send_email(email_class):
#取出所有未发送的消息
#对负责人进行聚合
from backend.models import Loophole, Manage_user
unsend_query_list = Loophole.objects.filter(is_send=0).values(‘fx_id‘) # 查出这台主机的fx_id
unsend_query_list = list(set([i[‘fx_id‘] for i in unsend_query_list]))
# print(unsend_query_list)
manage_user_list = Manage_user.objects.filter(server__server__uuid__in=unsend_query_list) # 筛选出有关于漏洞所有的服务责任
for manage_user in manage_user_list:
# manage_user.ywfx_set.all().first()
ywfx_id_list = manage_user.server.all()
ywfx_id_list = [i.server_id for i in ywfx_id_list]
all_loop_for_the_manage = Loophole.objects.filter(fx_id__in=ywfx_id_list) # 这个负责人所有的漏洞
msg = []
for loop in all_loop_for_the_manage:
loop.is_send=1
loop.save()
msg.append(f‘漏洞名称:[{loop.loophole_name}] 漏洞详情[{loop.loophole_detail}] 漏洞级别【{loop.risk_level}】 CEV编号[{loop.CEV_num}]\n‘)
import base64
from email.mime.text import MIMEText
# 构造邮件,内容为hello world
msg=‘\n‘.join(msg)
msg = MIMEText(msg)
# 设置邮件主题
today_day=datetime.datetime.now().strftime(‘%Y-%m-%d‘)
msg["Subject"] = f"{today_day}:资管系统-自动化漏洞检测服务-最新漏洞"
# 寄件者
msg["From"] = ‘资管系统-自动化漏洞检测服务‘
# 收件者
msg["To"] = ‘责任人‘
email_class.send_mail(msg,[manage_user.email])
def job():
zip_path=‘/data/loophole/‘
if not os.path.exists(zip_path):
os.mkdir(zip_path)
email_class=give_email(zip_path) #返回email对象供下次调用
analysis_file(zip_path)
send_email(email_class) #读取所有未发送的漏洞 并发送
if __name__ == ‘__main__‘:
TEST=‘1‘
if TEST:
job()
else :
from apscheduler.schedulers.blocking import BlockingScheduler #
scheduler = BlockingScheduler()
scheduler.add_job(job, ‘interval‘,minutes=5)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass