python主程序和两个静态页面,实现ansible批量执行更新脚本
app.py
import os
import logging
from flask import Flask, request, render_template, redirect, url_for, session, render_template_string
import hashlib
import subprocess
import datetime
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SECRET_KEY'] = '自定义32位字符串'
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:passwd@10.0.0.1/python_db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = 'users' # 指定表名为 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
class LoginLog(db.Model):
__tablename__ = 'login_logs' # 指定表名为 'login_logs'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), nullable=False)
login_time = db.Column(db.DateTime, nullabl
import requests
from fake_useragent import UserAgent
import csv
from datetime import datetime
def convert_created_at(created_at):
dt = datetime.strptime(created_at, '%a %b %d %H:%M:%S +0800 %Y')
formatted_created_at = dt.strftime('%Y-%m-%d %H:%M:%S')
return formatted_created_at
def get_weibo_mid(weibo_id):
url = f'https://weibo.com/ajax/statuses/show?id={weibo_id}'
header = {
'user-agent': UserAgent().random
}
response = requests.get(url=url, headers=header)
if response.status_code == 200:
try:
json_data = response.json()
weibo_mid = json_data.get('mid')
if weibo_mid:
return weibo_mid
else:
print("未找到微博 MID。")
except Exception as e:
print(f"解析微博 MID 时发生异常:{e}")
else:
print(f"无法获取微博 MID。状态码:{response.status_code}")
return None
def get_weibo_comments(weibo_id, csv_filename):
header = {
import openpyxl
import os
from copy import copy
def get_cell(sheet, cell):
# 如果是合并单元格,找到合并区域的起始单元格
for merged_cell_range in sheet.merged_cells.ranges:
if cell.coordinate in merged_cell_range:
return sheet[merged_cell_range.coord.split(':')[0]]
return cell
def copy_cell_format(src_cell, dest_cell):
if src_cell.has_style:
dest_cell.font = copy(src_cell.font)
dest_cell.border = copy(src_cell.border)
dest_cell.fill = copy(src_cell.fill)
dest_cell.number_format = src_cell.number_format
dest_cell.protection = copy(src_cell.protection)
dest_cell.alignment = copy(src_cell.alignment)
def copy_row(sheet_src, sheet_dest, src_row_num, dest_row_num):
src_row = sheet_src[src_row_num]
for cell in src_row:
dest_cell = sheet_dest.cell(row=dest_row_num, column=cell.column, value=cell.value)
copy_cell_format(cell, dest_cell)
# 复制行高
sheet_dest.row_dimensi