This commit is contained in:
HuangHai
2026-01-13 21:58:42 +08:00
parent 46f17aed11
commit e25aa5b756
11 changed files with 309 additions and 34 deletions

View File

@@ -0,0 +1,85 @@
import asyncio
import os
import sys
import logging
# 确保项目根目录在 sys.path 中
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Util.RedisKit import RedisKit
from sqlalchemy.sql import text
from Config.Config import DB_URL
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ClearAiTeJiYiChong")
async def main():
"""
1. 删除数据库中所有 operator='艾特吉易充' 的记录
2. 删除 Redis 中所有关于艾特吉易充的缓存信息
"""
operator = '艾特吉易充'
# 1. 数据库清理
logger.info(f"开始清理数据库中 operator='{operator}' 的数据...")
db = Db(db_url=DB_URL)
await db.init_db()
try:
async with db.AsyncSessionLocal() as session:
async with session.begin():
# 先删除从表记录(通过 station_hash 关联)
# 1. t_station_status_scd
sql_status = """
DELETE FROM t_station_status_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_status_scd...")
result_status = await session.execute(text(sql_status), {"operator": operator})
logger.info(f"t_station_status_scd 已删除 {result_status.rowcount} 行记录。")
# 2. t_station_price_schedule_scd
sql_price = """
DELETE FROM t_station_price_schedule_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_price_schedule_scd...")
result_price = await session.execute(text(sql_price), {"operator": operator})
logger.info(f"t_station_price_schedule_scd 已删除 {result_price.rowcount} 行记录。")
# 3. 最后删除主表 t_station_profile_scd
sql_profile = "DELETE FROM t_station_profile_scd WHERE operator = :operator"
logger.info("正在清理 t_station_profile_scd...")
result_profile = await session.execute(text(sql_profile), {"operator": operator})
logger.info(f"t_station_profile_scd 已删除 {result_profile.rowcount} 行记录。")
logger.info("数据库记录清理完成。")
except Exception as e:
logger.error(f"数据库清理失败: {e}")
# 2. Redis 清理
logger.info("开始清理 Redis 中的缓存数据...")
redis_kit = RedisKit()
# 根据 Crawler.py 中的逻辑key 的模式为 crawled:aite:*
pattern = "crawled:aite:*"
try:
keys = await redis_kit.keys(pattern)
if keys:
logger.info(f"匹配到 {len(keys)} 个键,正在删除...")
await redis_kit.delete(*keys)
logger.info("Redis 缓存清理完成。")
else:
logger.info(f"未匹配到模式为 '{pattern}' 的键。")
except Exception as e:
logger.error(f"Redis 清理失败: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,85 @@
import asyncio
import os
import sys
import logging
# 确保项目根目录在 sys.path 中
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Util.RedisKit import RedisKit
from sqlalchemy.sql import text
from Config.Config import DB_URL
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ClearXinDianTu")
async def main():
"""
1. 删除数据库中所有 operator='新电途' 的记录
2. 删除 Redis 中所有关于新电途的缓存信息
"""
operator = '新电途'
# 1. 数据库清理
logger.info(f"开始清理数据库中 operator='{operator}' 的数据...")
db = Db(db_url=DB_URL)
await db.init_db()
try:
async with db.AsyncSessionLocal() as session:
async with session.begin():
# 先删除从表记录(通过 station_hash 关联)
# 1. t_station_status_scd
sql_status = """
DELETE FROM t_station_status_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_status_scd...")
result_status = await session.execute(text(sql_status), {"operator": operator})
logger.info(f"t_station_status_scd 已删除 {result_status.rowcount} 行记录。")
# 2. t_station_price_schedule_scd
sql_price = """
DELETE FROM t_station_price_schedule_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_price_schedule_scd...")
result_price = await session.execute(text(sql_price), {"operator": operator})
logger.info(f"t_station_price_schedule_scd 已删除 {result_price.rowcount} 行记录。")
# 3. 最后删除主表 t_station_profile_scd
sql_profile = "DELETE FROM t_station_profile_scd WHERE operator = :operator"
logger.info("正在清理 t_station_profile_scd...")
result_profile = await session.execute(text(sql_profile), {"operator": operator})
logger.info(f"t_station_profile_scd 已删除 {result_profile.rowcount} 行记录。")
logger.info("数据库记录清理完成。")
except Exception as e:
logger.error(f"数据库清理失败: {e}")
# 2. Redis 清理
logger.info("开始清理 Redis 中的缓存数据...")
redis_kit = RedisKit()
# 根据 Crawler.py 中的逻辑key 的模式为 crawled:xdt:*
pattern = "crawled:xdt:*"
try:
keys = await redis_kit.keys(pattern)
if keys:
logger.info(f"匹配到 {len(keys)} 个键,正在删除...")
await redis_kit.delete(*keys)
logger.info("Redis 缓存清理完成。")
else:
logger.info(f"未匹配到模式为 '{pattern}' 的键。")
except Exception as e:
logger.error(f"Redis 清理失败: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,86 @@
import asyncio
import os
import sys
import logging
# 确保项目根目录在 sys.path 中
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
from DbKit.Db import Db
from Util.RedisKit import RedisKit
from sqlalchemy.sql import text
from Config.Config import DB_URL
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ClearYeLiTe")
async def main():
"""
根据注释完成代码:
1. 删除数据库中所有 operator='驿来特' 的记录
2. 删除 Redis 中所有关于驿来特的缓存信息
"""
operator = '驿来特'
# 1. 数据库清理
logger.info(f"开始清理数据库中 operator='{operator}' 的数据...")
db = Db(db_url=DB_URL)
await db.init_db()
try:
async with db.AsyncSessionLocal() as session:
async with session.begin():
# 先删除从表记录(通过 station_hash 关联)
# 1. t_station_status_scd
sql_status = """
DELETE FROM t_station_status_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_status_scd...")
result_status = await session.execute(text(sql_status), {"operator": operator})
logger.info(f"t_station_status_scd 已删除 {result_status.rowcount} 行记录。")
# 2. t_station_price_schedule_scd
sql_price = """
DELETE FROM t_station_price_schedule_scd
WHERE station_hash IN (
SELECT station_hash FROM t_station_profile_scd WHERE operator = :operator
)
"""
logger.info("正在清理 t_station_price_schedule_scd...")
result_price = await session.execute(text(sql_price), {"operator": operator})
logger.info(f"t_station_price_schedule_scd 已删除 {result_price.rowcount} 行记录。")
# 3. 最后删除主表 t_station_profile_scd
sql_profile = "DELETE FROM t_station_profile_scd WHERE operator = :operator"
logger.info("正在清理 t_station_profile_scd...")
result_profile = await session.execute(text(sql_profile), {"operator": operator})
logger.info(f"t_station_profile_scd 已删除 {result_profile.rowcount} 行记录。")
logger.info("数据库记录清理完成。")
except Exception as e:
logger.error(f"数据库清理失败: {e}")
# 2. Redis 清理
logger.info("开始清理 Redis 中的缓存数据...")
redis_kit = RedisKit()
# 根据 Crawler.py 中的逻辑key 的模式为 crawled:ylt:*
pattern = "crawled:ylt:*"
try:
keys = await redis_kit.keys(pattern)
if keys:
logger.info(f"匹配到 {len(keys)} 个键,正在删除...")
await redis_kit.delete(*keys)
logger.info("Redis 缓存清理完成。")
else:
logger.info(f"未匹配到模式为 '{pattern}' 的键。")
except Exception as e:
logger.error(f"Redis 清理失败: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -198,17 +198,18 @@ class YiLaiTeCrawler(BaseCrawler):
logger.info("向下滚动以获取更多电价信息(从底部向上滑)...")
# User feedback: 必须放在下方,向上滚动
w, h = d.window_size()
# start_y=0.9 (底部), end_y=0.4 (上部)
d.swipe(w * 0.5, h * 0.9, w * 0.5, h * 0.4, 0.5)
await asyncio.sleep(1.5)
# 调整滑动参数:避开底部导航栏,增加滑动距离,放慢速度
# start_y=0.75, end_y=0.25
d.swipe(w * 0.5, h * 0.75, w * 0.5, h * 0.25, 0.8)
await asyncio.sleep(2.0) # 增加等待时间,确保滚动停止
# 截图2 (底部)
shot2 = take_screenshot(d, f"detail_price_{clean_station_name(name)}_{int(time.time())}_2")
detail_shots.append(shot2)
# 关闭分时段定价列表 (点击屏幕上半部分空白处)
# 关闭分时段定价列表 (点击屏幕最顶部空白处)
logger.info("点击屏幕上部空白处以关闭定价列表...")
d.click(w * 0.5, h * 0.2)
d.click(w * 0.5, h * 0.1)
await asyncio.sleep(1.0)
else:
logger.info("未发现‘阶段性电价’按钮,直接分析当前页")
@@ -305,11 +306,13 @@ class YiLaiTeCrawler(BaseCrawler):
except Exception as e:
logger.error(f"后台分析 {station_name} 失败: {e}")
finally:
if isinstance(image_paths, list):
for p in image_paths:
if os.path.exists(p): os.remove(p)
elif isinstance(image_paths, str) and os.path.exists(image_paths):
os.remove(image_paths)
# 调试阶段暂时不删除截图,方便排查 VLM 识别失败原因
pass
# if isinstance(image_paths, list):
# for p in image_paths:
# if os.path.exists(p): os.remove(p)
# elif isinstance(image_paths, str) and os.path.exists(image_paths):
# os.remove(image_paths)
async def main(service=None):
if service is None:

View File

@@ -50,13 +50,52 @@ class ReadImageKit:
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
# 强制写入文件调试
# debug_file = r"d:\dsWork\aiData\vlm_response_debug.txt"
# with open(debug_file, "a", encoding="utf-8") as f:
# f.write(f"\n\n=== {os.path.basename(image_path)} ===\n")
# f.write(str(res_text))
# f.write("\n====================================\n")
# 增加调试日志,查看 VLM 原始返回
logger.info(f"VLM Price Analysis Result for {os.path.basename(image_path)}: {res_text[:200]}...")
json_str = self.vlm.extract_json(res_text)
prices = json.loads(json_str)
# 兼容性处理:如果 VLM 返回了 time_range 字段,进行转换
normalized_prices = []
if isinstance(prices, list):
return prices
for p in prices:
new_p = p.copy()
# 处理时间段
if 'time_range' in p and ('start' not in p or 'end' not in p):
tr = p['time_range'].replace('~', '-').replace(' ', '')
parts = tr.split('-')
if len(parts) >= 2:
new_p['start'] = parts[0]
new_p['end'] = parts[1]
# 处理价格字段
if 'price' not in p:
if 'total_price' in p:
new_p['price'] = p['total_price']
elif 'elec_price' in p and 'service_price' in p:
try:
new_p['price'] = float(p['elec_price']) + float(p['service_price'])
except:
pass
normalized_prices.append(new_p)
return normalized_prices
return []
except Exception as e:
logger.error(f"分析电价详情失败: {e}")
if 'res_text' in locals():
logger.error(f"Failed VLM Response: {res_text}")
return []
@classmethod
@@ -157,26 +196,3 @@ class ReadImageKit:
except Exception as e:
logger.error(f"Hybrid 分析列表页失败: {e}")
return []
async def analyze_detail_price(self, image_path):
"""
分析详情页或三级价格页图片,提取分时电价
"""
prompt = """
分析这张充电站价格详情截图,提取完整的分时价格信息。
输出格式为 JSON 数组,每个对象包含:
- "time_range": 时间段 (例如 "00:00-08:00")
- "total_price": 总价 (电费+服务费)
- "elec_price": 电费 (如果能看到)
- "service_price": 服务费 (如果能看到)
如果没有看到分时段价格,请尝试寻找“价格详情”或“分时电价”按钮的坐标。
"""
try:
res_text = await self.vlm.analyze_image(image_path, prompt)
json_str = self.vlm.extract_json(res_text)
return json.loads(json_str)
except Exception as e:
logger.error(f"VLM 分析价格页失败: {e}")
return []