This commit is contained in:
HuangHai
2026-01-13 19:29:02 +08:00
parent 9fdbdaa338
commit a9b0d125d2
6 changed files with 106 additions and 6 deletions

View File

@@ -269,7 +269,7 @@ async def get_station_list(d, service, max_stations_count=MAX_STATIONS_COUNT):
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info(f"采集任务完成,共采集 {total_processed_count} 个场站。")
return True
return total_processed_count
async def analyze_prices_background(service, station_name, image_paths):
"""
@@ -313,21 +313,63 @@ async def main(service=None, do_cleanup=True):
if do_cleanup:
Kit.clear_temp_dir()
should_close_service = False
if service is None:
service = AiTeJiYiChongService()
should_close_service = True
await service.init_db()
d = u2.connect()
# 任务日志记录
from datetime import datetime
task_id = str(uuid.uuid4())
start_time = datetime.now()
operator = "艾特吉易充"
# 记录任务开始
try:
await get_station_list(d, service)
await service.db.save("t_crawl_task_log", {
"task_id": task_id,
"operator": operator,
"start_time": start_time,
"status": "running"
}, "task_id")
except Exception as e:
logger.error(f"无法记录任务开始日志: {e}")
total_count = 0
status = "success"
error_msg = None
try:
total_count = await get_station_list(d, service)
return True
except Exception as e:
logger.error(f"爬取过程中出现异常: {e}")
status = "failed"
error_msg = str(e)
return False
finally:
# 记录任务结束
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds())
try:
await service.db.update("t_crawl_task_log", {
"task_id": task_id,
"end_time": end_time,
"duration_seconds": duration,
"station_count": total_count,
"status": status,
"error_msg": error_msg
}, "task_id")
logger.info(f"任务执行日志已更新: {operator} | 耗时: {duration}s | 数量: {total_count} | 状态: {status}")
except Exception as e:
logger.error(f"无法更新任务执行日志: {e}")
# 如果是内部初始化的 service则在此关闭
pass
if should_close_service:
await service.close_db()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -429,7 +429,7 @@ async def get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS
await asyncio.gather(*background_tasks, return_exceptions=True)
logger.info("采集任务完成")
return True
return total_processed_count
def clean_images_dir():
@@ -486,6 +486,27 @@ async def main(service=None, do_cleanup=True):
uploader = ObsUploader()
redis_kit = RedisKit()
# 任务日志记录
from datetime import datetime
task_id = str(uuid.uuid4())
start_time = datetime.now()
operator = "新电途"
# 记录任务开始
try:
await service.db.save("t_crawl_task_log", {
"task_id": task_id,
"operator": operator,
"start_time": start_time,
"status": "running"
}, "task_id")
except Exception as e:
logger.error(f"无法记录任务开始日志: {e}")
total_count = 0
status = "success"
error_msg = None
try:
if do_cleanup:
# 清理图片目录
@@ -498,9 +519,9 @@ async def main(service=None, do_cleanup=True):
# await service.cleanup_old_data()
# 获取场站列表
stations = await get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT)
total_count = await get_station_list(d, service, uploader, max_stations_count=MAX_STATIONS_COUNT)
if stations:
if total_count:
logger.info("场站列表采集完成。")
else:
logger.warning("未采集到任何场站信息。")
@@ -509,8 +530,26 @@ async def main(service=None, do_cleanup=True):
except Exception as e:
logger.exception(f"运行过程中出现异常: {e}")
status = "failed"
error_msg = str(e)
return False
finally:
# 记录任务结束
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds())
try:
await service.db.update("t_crawl_task_log", {
"task_id": task_id,
"end_time": end_time,
"duration_seconds": duration,
"station_count": total_count,
"status": status,
"error_msg": error_msg
}, "task_id")
logger.info(f"任务执行日志已更新: {operator} | 耗时: {duration}s | 数量: {total_count} | 状态: {status}")
except Exception as e:
logger.error(f"无法更新任务执行日志: {e}")
# 关闭数据库连接
if should_close_service:
await service.close_db()

Binary file not shown.

View File

@@ -19,6 +19,25 @@ PROPERTIES (
"replication_num" = "1"
);
-- 爬取任务执行日志表
-- 用于记录每次爬取任务的执行情况,评估性能
DROP TABLE IF EXISTS t_crawl_task_log;
CREATE TABLE IF NOT EXISTS t_crawl_task_log (
`task_id` VARCHAR(50) NOT NULL COMMENT '任务唯一ID',
`operator` VARCHAR(50) NOT NULL COMMENT '爬取供应商/运营商',
`start_time` DATETIME NOT NULL COMMENT '任务开始时间',
`end_time` DATETIME COMMENT '任务结束时间',
`duration_seconds` INT COMMENT '耗时(秒)',
`station_count` INT DEFAULT '0' COMMENT '成功爬取的场站数量',
`status` VARCHAR(20) DEFAULT 'running' COMMENT '任务状态 (running, success, failed)',
`error_msg` TEXT COMMENT '错误信息'
)
UNIQUE KEY(`task_id`)
DISTRIBUTED BY HASH(`task_id`) BUCKETS 5
PROPERTIES (
"replication_num" = "1"
);
-- 充电站实时状态表 (SCD2 / Log)
-- 保存场站的桩数、空闲数等动态信息
-- 每次抓取如果状态变化则写入新记录