146 lines
6.4 KiB
Python
146 lines
6.4 KiB
Python
import asyncio
|
|
import os
|
|
import sys
|
|
import logging
|
|
from sqlalchemy import text
|
|
|
|
# 配置日志
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# python -m pip install webdriver-manager selenium
|
|
# https://googlechromelabs.github.io/chrome-for-testing/
|
|
# https://storage.googleapis.com/chrome-for-testing-public/143.0.7499.192/win64/chromedriver-win64.zip
|
|
# update t_station_profile_scd set coord_x=null,coord_y=null where id='95783932-1d32-48e4-9a38-6a6696a6a423'
|
|
# 将项目根目录添加到系统路径,确保可以导入 Util, DbKit 和 Config 模块
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from Util.PaChongGaoDeKit import PaChongGaoDeKit
|
|
from DbKit.Db import Db
|
|
from Config.Config import DB_URL
|
|
|
|
class CookieInvalidError(Exception):
|
|
"""自定义异常:表示 Cookie 无效或过期"""
|
|
pass
|
|
|
|
async def main(check_cookie_first=True):
|
|
"""
|
|
坐标回填任务:自动扫描数据库中缺失坐标的场站地址,并通过高德地图补全。
|
|
:param check_cookie_first: 是否在开始前验证 Cookie 有效性
|
|
"""
|
|
logger.info("正在启动 T4_Coord: 高德地图坐标回填任务...")
|
|
|
|
# 初始化数据库连接
|
|
db = Db(db_url=DB_URL)
|
|
await db.init_db()
|
|
|
|
# 初始化高德地图工具类(使用无头模式)
|
|
# 显式指定 Cookie 路径,确保跨目录运行时能找到文件
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
cookie_path = os.path.join(project_root, 'Json', 'amap_cookies.json')
|
|
|
|
if not os.path.exists(cookie_path):
|
|
logger.error(f"错误:未找到 Cookie 文件:{cookie_path}")
|
|
# 如果要求检查且文件不存在,直接抛出异常以便上层处理
|
|
if check_cookie_first:
|
|
await db.close()
|
|
raise CookieInvalidError("Cookie file not found")
|
|
logger.error("请先运行 Tools/T3_GetAmapCookie.py 生成有效 Cookie。")
|
|
return
|
|
|
|
kit = PaChongGaoDeKit(cookie_file=cookie_path, headless=False)
|
|
|
|
try:
|
|
# 0. Cookie 有效性探针
|
|
if check_cookie_first:
|
|
logger.info("正在验证高德地图 Cookie 有效性...")
|
|
loop = asyncio.get_event_loop()
|
|
# 使用一个肯定存在的地址进行测试
|
|
test_addr = "北京市天安门"
|
|
t_lng, t_lat = await loop.run_in_executor(None, kit.get_coordinate, test_addr)
|
|
|
|
if not t_lng:
|
|
logger.warning(f"Cookie 验证失败:无法获取测试地址 '{test_addr}' 的坐标。")
|
|
logger.warning("判定 Cookie 已失效或过期。")
|
|
raise CookieInvalidError("Cookie invalid or expired")
|
|
else:
|
|
logger.info(f"Cookie 验证通过 (测试坐标: {t_lng}, {t_lat})。")
|
|
|
|
async with await db.get_session() as session:
|
|
# 1. 查询所有地址不为空但缺失坐标的唯一地址
|
|
sql_missing = """
|
|
SELECT DISTINCT address
|
|
FROM t_station_profile_scd
|
|
WHERE address IS NOT NULL
|
|
AND address != ''
|
|
AND (coord_x IS NULL OR coord_y IS NULL)
|
|
"""
|
|
result = await session.execute(text(sql_missing))
|
|
missing_addresses = [row[0] for row in result.fetchall()]
|
|
|
|
logger.info(f"共发现 {len(missing_addresses)} 个缺失坐标的唯一地址。")
|
|
|
|
for address in missing_addresses:
|
|
logger.info(f"\n正在处理地址: {address}")
|
|
|
|
# 2. 检查数据库中其他记录是否已经存有该地址的坐标(去重复查询)
|
|
# 这样可以最大程度减少对高德地图接口的调用
|
|
sql_check = """
|
|
SELECT coord_x, coord_y
|
|
FROM t_station_profile_scd
|
|
WHERE address = :addr
|
|
AND coord_x IS NOT NULL
|
|
AND coord_y IS NOT NULL
|
|
LIMIT 1
|
|
"""
|
|
existing = await session.execute(text(sql_check), {"addr": address})
|
|
row = existing.fetchone()
|
|
|
|
lng, lat = None, None
|
|
|
|
if row:
|
|
lng, lat = row[0], row[1]
|
|
logger.info(f" -> 在数据库中找到现有坐标: {lng}, {lat}")
|
|
else:
|
|
# 3. 数据库中没有,调用高德地图进行查询
|
|
logger.info(" -> 数据库中无记录,正在查询高德地图...")
|
|
# 在线程池中运行同步的 Selenium 调用,避免阻塞异步循环
|
|
loop = asyncio.get_event_loop()
|
|
lng, lat = await loop.run_in_executor(None, kit.get_coordinate, address)
|
|
|
|
if lng and lat:
|
|
logger.info(f" -> 高德地图返回坐标: {lng}, {lat}")
|
|
else:
|
|
logger.info(" -> 高德地图未返回有效坐标。")
|
|
|
|
# 4. 如果获取到了坐标,更新数据库中所有匹配该地址的记录
|
|
if lng and lat:
|
|
sql_update = """
|
|
UPDATE t_station_profile_scd
|
|
SET coord_x = :lng, coord_y = :lat
|
|
WHERE address = :addr
|
|
"""
|
|
await session.execute(text(sql_update), {
|
|
"lng": lng,
|
|
"lat": lat,
|
|
"addr": address
|
|
})
|
|
await session.commit()
|
|
logger.info(f" -> 已成功更新数据库中地址为 '{address}' 的记录。")
|
|
else:
|
|
logger.info(f" -> 跳过更新 (未找到有效坐标)。")
|
|
|
|
except Exception as e:
|
|
logger.error(f"执行过程中出错: {e}")
|
|
finally:
|
|
# 释放资源
|
|
kit.close()
|
|
await db.close()
|
|
logger.info("\n任务执行结束。")
|
|
|
|
if __name__ == "__main__":
|
|
# 针对 Windows 平台的异步策略设置
|
|
if sys.platform == 'win32':
|
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
|
asyncio.run(main())
|