889 lines
34 KiB
Python
889 lines
34 KiB
Python
"""
|
||
高德地图高精度坐标获取工具
|
||
|
||
使用说明:
|
||
1、注册用户,实名认证
|
||
2、手动登录,选择微信登录,生成cookie
|
||
3、复用这个cookie,打开浏览器,传入单位名称,查询坐标,得到小数点6位的坐标数据
|
||
"""
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
import weakref
|
||
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
||
# 添加SQLAlchemy同步操作
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
# 数据库连接配置
|
||
from Config.Config import POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DATABASE
|
||
|
||
# 创建同步数据库连接URL
|
||
db_url = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"
|
||
|
||
# 创建数据库引擎和会话工厂
|
||
engine = create_engine(db_url)
|
||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||
|
||
# 数据库操作函数
|
||
def find(sql, params=None):
|
||
"""
|
||
执行SQL查询,返回所有结果
|
||
|
||
Args:
|
||
sql: SQL查询语句
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
list: 查询结果的字典列表
|
||
"""
|
||
with SessionLocal() as session:
|
||
result = session.execute(text(sql), params)
|
||
rows = result.fetchall()
|
||
return [dict(zip(result.keys(), row)) for row in rows]
|
||
|
||
|
||
def findFirst(sql, params=None):
|
||
"""
|
||
执行SQL查询,返回第一条结果
|
||
|
||
Args:
|
||
sql: SQL查询语句
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
dict or None: 第一条查询结果的字典,或None(如果没有结果)
|
||
"""
|
||
with SessionLocal() as session:
|
||
result = session.execute(text(sql), params)
|
||
row = result.fetchone()
|
||
if row:
|
||
return dict(zip(result.keys(), row))
|
||
return None
|
||
|
||
|
||
def execute_update(sql, params=None):
|
||
"""
|
||
执行SQL更新操作
|
||
|
||
Args:
|
||
sql: SQL语句
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
int: 受影响的行数
|
||
"""
|
||
with SessionLocal() as session:
|
||
with session.begin():
|
||
result = session.execute(text(sql), params)
|
||
return result.rowcount
|
||
|
||
|
||
# 是否使用无头模式运行(True=无头模式,False=有界面模式)
|
||
HEADLESS = False
|
||
|
||
# 最大重试次数
|
||
MAX_RETRIES = 1
|
||
|
||
# WebDriver超时设置(秒)
|
||
TIMEOUT_SHORT = 5
|
||
TIMEOUT_MEDIUM = 10
|
||
TIMEOUT_LONG = 15
|
||
|
||
|
||
def parse_coordinates_info(text):
|
||
"""解析坐标信息,提取第一个搜索结果的坐标"""
|
||
# 检查输入参数
|
||
if not text:
|
||
return None
|
||
|
||
try:
|
||
# 清理文本,移除可能的干扰字符
|
||
clean_text = re.sub(r'[\u3000\t\n\r ]+', ',', text) # 替换空格和制表符为逗号
|
||
clean_text = re.sub(r'[,,]+', ',', clean_text) # 统一使用英文逗号
|
||
clean_text = clean_text.strip(',') # 移除首尾逗号
|
||
|
||
# 匹配高德地图坐标格式 - 中国地区的坐标范围,放宽小数位数要求
|
||
pattern = r'([7-9]?[0-9]\.[0-9]{1,}|1[0-3][0-9]\.[0-9]{1,})[,,]\s*([1-8]?[0-9]\.[0-9]{1,}|90\.[0-9]{1,})'
|
||
matches = re.finditer(pattern, clean_text)
|
||
|
||
valid_coordinates = []
|
||
|
||
for match in matches:
|
||
lng = match.group(1).strip()
|
||
lat = match.group(2).strip()
|
||
|
||
# 转换为浮点数进行范围验证
|
||
try:
|
||
lng_float = float(lng)
|
||
lat_float = float(lat)
|
||
|
||
# 检查坐标是否在中国地区合理范围内
|
||
if 70 <= lng_float <= 140 and 0 <= lat_float <= 60:
|
||
# 计算坐标的精度(小数位数)
|
||
lng_precision = len(lng.split('.')[1]) if '.' in lng else 0
|
||
lat_precision = len(lat.split('.')[1]) if '.' in lat else 0
|
||
|
||
# 接受至少有1位小数的坐标,不再严格要求4位小数
|
||
valid_coordinates.append((lng_float, lat_float, lng, lat))
|
||
except ValueError:
|
||
continue
|
||
|
||
if valid_coordinates:
|
||
# 如果找到多个有效坐标,选择小数位数最多的
|
||
valid_coordinates.sort(
|
||
key=lambda x: len(x[2].split('.')[1]) + len(x[3].split('.')[1]) if '.' in x[2] and '.' in x[3] else 0,
|
||
reverse=True)
|
||
_, _, lng, lat = valid_coordinates[0]
|
||
|
||
return {
|
||
'lng': lng,
|
||
'lat': lat
|
||
}
|
||
else:
|
||
return None
|
||
|
||
except Exception as e:
|
||
return None
|
||
|
||
|
||
def export_cookies_from_browser(driver, output_file='amap_cookies.json'):
|
||
"""
|
||
导出浏览器中的Cookie到文件
|
||
|
||
Args:
|
||
driver: WebDriver实例
|
||
output_file: Cookie导出文件路径
|
||
|
||
Returns:
|
||
bool: 导出是否成功
|
||
"""
|
||
# 检查输入参数
|
||
if not driver:
|
||
print("❌ WebDriver实例无效")
|
||
return False
|
||
|
||
if not output_file:
|
||
print("❌ Cookie输出文件路径无效")
|
||
return False
|
||
|
||
try:
|
||
print(f"正在导出Cookie到文件: {output_file}")
|
||
|
||
# 确保目录存在
|
||
output_dir = os.path.dirname(output_file)
|
||
if output_dir and not os.path.exists(output_dir):
|
||
try:
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
except Exception as e:
|
||
print(f"创建输出目录失败: {str(e)}")
|
||
return False
|
||
|
||
# 获取Cookie
|
||
cookies = driver.get_cookies()
|
||
|
||
if not cookies:
|
||
print("⚠️ 未获取到任何Cookie")
|
||
return False
|
||
|
||
# 保存Cookie到文件
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump(cookies, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"成功导出 {len(cookies)} 个Cookie")
|
||
print(f"Cookie已成功导出到: {os.path.abspath(output_file)}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"导出Cookie失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
def load_cookies_to_browser(driver, cookie_file='amap_cookies.json'):
|
||
"""
|
||
从文件加载Cookie到浏览器
|
||
|
||
Args:
|
||
driver: WebDriver实例
|
||
cookie_file: Cookie文件路径
|
||
|
||
Returns:
|
||
bool: 加载是否成功
|
||
"""
|
||
try:
|
||
# 检查输入参数
|
||
if not driver:
|
||
print("❌ WebDriver实例无效")
|
||
return False
|
||
|
||
if not cookie_file:
|
||
print("❌ Cookie文件路径无效")
|
||
return False
|
||
|
||
if not os.path.exists(cookie_file):
|
||
return False
|
||
|
||
# 先访问高德域名,确保Cookie可以被正确设置
|
||
import ssl
|
||
ssl._create_default_https_context = ssl._create_unverified_context
|
||
driver.get('https://www.amap.com')
|
||
|
||
time.sleep(1) # 等待页面加载
|
||
|
||
# 加载Cookie
|
||
with open(cookie_file, 'r', encoding='utf-8') as f:
|
||
cookies = json.load(f)
|
||
|
||
# 添加每个Cookie
|
||
success_count = 0
|
||
for cookie in cookies:
|
||
# 移除可能导致问题的属性
|
||
if 'expiry' in cookie:
|
||
cookie['expiry'] = int(cookie['expiry'])
|
||
if 'sameSite' in cookie and cookie['sameSite'] not in ['Strict', 'Lax', 'None']:
|
||
del cookie['sameSite']
|
||
|
||
try:
|
||
driver.add_cookie(cookie)
|
||
success_count += 1
|
||
except Exception as e:
|
||
#print(f"添加Cookie失败 (可能是安全策略限制): {cookie.get('name')} - {str(e)}")
|
||
pass
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"加载Cookie失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
def get_location_coordinates(location_name, headless=False, cookie_file='amap_cookies.json', driver=None,
|
||
reuse_driver=False):
|
||
"""
|
||
获取指定名称地点的坐标信息
|
||
|
||
Args:
|
||
location_name (str): 要搜索的地点名称
|
||
headless (bool): 是否使用无头模式,默认False(有界面)
|
||
cookie_file (str): Cookie文件路径
|
||
driver: 可选的WebDriver实例(用于复用)
|
||
reuse_driver: 是否复用WebDriver实例
|
||
|
||
Returns:
|
||
dict or None: 包含名称、经度和纬度的字典,失败时返回None
|
||
"""
|
||
# 检查输入参数
|
||
if not location_name or not location_name.strip():
|
||
print("❌ 地点名称无效")
|
||
return None
|
||
|
||
if not cookie_file or not os.path.exists(cookie_file):
|
||
print(f"❌ Cookie文件不存在或无效: {cookie_file}")
|
||
return None
|
||
|
||
# 浏览器管理标志
|
||
need_quit_driver = False
|
||
|
||
try:
|
||
# 如果没有提供driver,创建新的
|
||
if not driver:
|
||
# 设置Chrome浏览器参数
|
||
chrome_options = Options()
|
||
if headless:
|
||
chrome_options.add_argument('--headless')
|
||
|
||
# 添加必要的配置参数
|
||
# 我有GPU,可以放开
|
||
# chrome_options.add_argument('--disable-gpu')
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
|
||
# 禁用自动化控制特征
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled") # 防止自动化检测,关闭浏览器控制显示
|
||
chrome_options.add_argument('-ignore-certificate-errors') # 忽略证书错误
|
||
chrome_options.add_argument('-ignore-ssl-errors') # 忽略相关错误
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) # 防止自动化检测
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) # 防止自动化日志输出检测
|
||
|
||
# 添加随机User-Agent
|
||
user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 99)}.0.{random.randint(4000, 5000)}.{random.randint(100, 200)} Safari/537.36'
|
||
chrome_options.add_argument(f'user-agent={user_agent}')
|
||
|
||
# 尝试初始化WebDriver
|
||
service = Service()
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
driver_path = ChromeDriverManager().install()
|
||
service = Service(driver_path)
|
||
except:
|
||
pass
|
||
|
||
# 初始化WebDriver
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
need_quit_driver = True
|
||
|
||
# 最大化窗口(如果不是无头模式)
|
||
if not headless:
|
||
driver.maximize_window()
|
||
|
||
# 禁用自动化特征
|
||
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
||
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||
})
|
||
|
||
# 使用Cookie方式认证
|
||
cookies_loaded = load_cookies_to_browser(driver, cookie_file)
|
||
if not cookies_loaded:
|
||
print("Cookie认证失败,无法继续获取高精度坐标")
|
||
return None
|
||
|
||
# 访问高德地图坐标拾取工具
|
||
url = 'https://lbs.amap.com/tools/picker'
|
||
driver.get(url)
|
||
|
||
time.sleep(2) # 等待页面完全加载
|
||
|
||
# 定位搜索输入框并输入查询内容
|
||
try:
|
||
# 使用ID定位搜索框
|
||
search_input = WebDriverWait(driver, TIMEOUT_MEDIUM).until(
|
||
EC.presence_of_element_located((By.ID, 'txtSearch'))
|
||
)
|
||
except Exception as e:
|
||
print(f"定位搜索输入框失败: {e}")
|
||
return None
|
||
|
||
# 输入搜索内容
|
||
search_input.clear()
|
||
search_input.send_keys(location_name)
|
||
|
||
# 定位搜索按钮并点击
|
||
try:
|
||
# 尝试使用class选择器定位搜索按钮
|
||
search_button = WebDriverWait(driver, TIMEOUT_SHORT).until(
|
||
EC.presence_of_element_located((By.CLASS_NAME, 'btn-search'))
|
||
)
|
||
search_button.click()
|
||
except Exception as e:
|
||
print(f"定位搜索按钮失败: {e}")
|
||
# 尝试按回车键搜索
|
||
try:
|
||
search_input.send_keys(Keys.ENTER)
|
||
except Exception as e2:
|
||
print(f"回车键搜索也失败: {e2}")
|
||
return None
|
||
|
||
# 等待搜索结果加载完成并显示坐标
|
||
time.sleep(3) # 等待时间,确保页面完全加载并显示搜索结果和坐标
|
||
|
||
# 获取坐标信息 - 只使用成功的方式(从txtCoordinate元素的value属性获取)
|
||
coordinate_text = None
|
||
|
||
# 查找ID为txtCoordinate的元素(这是高德地图坐标拾取器的核心坐标显示元素)
|
||
try:
|
||
coordinate_element = WebDriverWait(driver, TIMEOUT_MEDIUM).until(
|
||
EC.presence_of_element_located((By.ID, 'txtCoordinate'))
|
||
)
|
||
|
||
# 获取value属性(这是input元素存储值的标准方式)
|
||
coordinate_value = coordinate_element.get_attribute('value')
|
||
if coordinate_value and coordinate_value.strip():
|
||
coordinate_text = coordinate_value.strip()
|
||
else:
|
||
print("警告: txtCoordinate元素的value属性为空")
|
||
|
||
# 作为备选,尝试获取text内容
|
||
coordinate_text_content = coordinate_element.text.strip()
|
||
if coordinate_text_content:
|
||
coordinate_text = coordinate_text_content
|
||
print(f"从text内容获取到坐标文本: '{coordinate_text}'")
|
||
except Exception as e:
|
||
print(f"查找或获取坐标元素失败: {e}")
|
||
return None
|
||
|
||
# 如果没有获取到坐标,返回失败
|
||
if not coordinate_text or coordinate_text.strip() == '':
|
||
print("未能获取到有效的坐标文本")
|
||
return None
|
||
|
||
# 解析坐标信息
|
||
print(f"解析坐标文本: '{coordinate_text}'")
|
||
|
||
coordinate_info = parse_coordinates_info(coordinate_text)
|
||
if coordinate_info:
|
||
# 进一步验证坐标的合理性(中国地区范围)
|
||
try:
|
||
lng = float(coordinate_info['lng'])
|
||
lat = float(coordinate_info['lat'])
|
||
|
||
# 中国地区的合理坐标范围
|
||
if 70 <= lng <= 140 and 0 <= lat <= 60:
|
||
print(f"成功获取坐标: {coordinate_info['lng']},{coordinate_info['lat']}")
|
||
return coordinate_info
|
||
else:
|
||
print(f"坐标超出中国地区合理范围 - 经度: {lng}, 纬度: {lat}")
|
||
return None
|
||
except (ValueError, TypeError):
|
||
print(f"坐标数值验证失败: {coordinate_info}")
|
||
return None
|
||
else:
|
||
print("坐标解析失败")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"获取坐标时发生错误: {e}")
|
||
return None
|
||
finally:
|
||
# 关闭浏览器(如果不是复用的)
|
||
if not reuse_driver and need_quit_driver and driver:
|
||
driver.close()
|
||
driver.quit()
|
||
|
||
|
||
def get_multiple_locations_coordinates(location_names, headless=False, cookie_file='amap_cookies.json'):
|
||
"""
|
||
批量获取多个地点的坐标信息
|
||
|
||
Args:
|
||
location_names (list): 地点名称列表
|
||
headless (bool): 是否使用无头模式,默认False(有界面)
|
||
cookie_file (str): Cookie文件路径
|
||
|
||
Returns:
|
||
dict: 包含所有地点坐标信息的字典,键为地点名称,值为坐标信息字典或None(失败时)
|
||
"""
|
||
# 检查输入参数
|
||
if not location_names:
|
||
print("❌ 地点列表为空,无需获取坐标")
|
||
return {}
|
||
|
||
print(f"开始批量获取坐标信息 - 地点数量: {len(location_names)}, 无头模式: {headless}")
|
||
|
||
# 存储所有地点的坐标信息
|
||
all_coordinates = {}
|
||
success_count = 0
|
||
failure_count = 0
|
||
skipped_count = 0
|
||
invalid_coord_count = 0
|
||
|
||
# 创建并复用浏览器实例
|
||
created_driver = None
|
||
|
||
# 记录是否已经退出,避免重复退出
|
||
driver_closed = False
|
||
|
||
try:
|
||
# 初始化Chrome浏览器
|
||
chrome_options = Options()
|
||
if headless:
|
||
chrome_options.add_argument('--headless')
|
||
|
||
# 添加必要的配置参数
|
||
# chrome_options.add_argument('--disable-gpu')
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled") # 防止自动化检测,关闭浏览器控制显示
|
||
chrome_options.add_argument('-ignore-certificate-errors') # 忽略证书错误
|
||
chrome_options.add_argument('-ignore-ssl-errors') # 忽略相关错误
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) # 防止自动化检测
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) # 防止自动化日志输出检测
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# 添加随机User-Agent
|
||
user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 99)}.0.{random.randint(4000, 5000)}.{random.randint(100, 200)} Safari/537.36'
|
||
chrome_options.add_argument(f'user-agent={user_agent}')
|
||
|
||
# 尝试使用webdriver_manager
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
driver_path = ChromeDriverManager().install()
|
||
service = Service(driver_path)
|
||
print(f"成功使用webdriver_manager安装驱动: {driver_path}")
|
||
except:
|
||
print("使用系统默认的Chrome驱动")
|
||
service = Service()
|
||
|
||
# 初始化WebDriver
|
||
created_driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
||
# 将创建的driver实例添加到全局跟踪集合中
|
||
if 'driver_instances' in globals():
|
||
driver_instances.add(created_driver)
|
||
|
||
# 最大化窗口(如果不是无头模式)
|
||
if not headless:
|
||
created_driver.maximize_window()
|
||
|
||
# 禁用自动化特征
|
||
created_driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
||
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||
})
|
||
|
||
# 使用Cookie方式认证
|
||
print("尝试使用Cookie文件进行认证")
|
||
cookies_loaded = load_cookies_to_browser(created_driver, cookie_file)
|
||
if not cookies_loaded:
|
||
print("Cookie认证失败,无法继续获取高精度坐标")
|
||
return all_coordinates
|
||
|
||
# 访问高德地图坐标拾取工具
|
||
url = 'https://lbs.amap.com/tools/picker'
|
||
created_driver.get(url)
|
||
time.sleep(2) # 增加等待时间以确保页面完全加载
|
||
|
||
# 循环获取每个地点的坐标
|
||
for i, location_name in enumerate(location_names, 1):
|
||
print(f"处理地点 {i}/{len(location_names)}: '{location_name}'")
|
||
|
||
# 对每个地点查询
|
||
try:
|
||
# 调用get_location_coordinates获取坐标
|
||
result = get_location_coordinates(location_name, headless=headless, cookie_file=cookie_file,
|
||
driver=created_driver, reuse_driver=True)
|
||
except Exception as e:
|
||
print(f"❌ 地点: {location_name} - 查询出错: {str(e)},跳过该地点")
|
||
result = None
|
||
skipped_count += 1
|
||
|
||
# 存储结果
|
||
all_coordinates[location_name] = result
|
||
|
||
# 坐标获取成功,进行有效性验证
|
||
if result:
|
||
# 验证坐标的地理合理性(中国地区范围)
|
||
try:
|
||
lng = float(result['lng'])
|
||
lat = float(result['lat'])
|
||
|
||
# 中国地区的合理坐标范围
|
||
if 70 <= lng <= 140 and 0 <= lat <= 60:
|
||
# 检查坐标精度(至少4位小数)
|
||
lng_precision = len(result['lng'].split('.')[1]) if '.' in result['lng'] else 0
|
||
lat_precision = len(result['lat'].split('.')[1]) if '.' in result['lat'] else 0
|
||
|
||
if lng_precision >= 4 and lat_precision >= 4:
|
||
print(f"✅ 地点: {location_name} - 坐标获取成功: {result['lng']},{result['lat']}")
|
||
success_count += 1
|
||
else:
|
||
print(
|
||
f"⚠️ 地点: {location_name} - 坐标精度不足: {result['lng']},{result['lat']} (经度{lng_precision}位, 纬度{lat_precision}位小数)")
|
||
invalid_coord_count += 1
|
||
else:
|
||
print(f"⚠️ 地点: {location_name} - 坐标超出中国地区合理范围: {result['lng']},{result['lat']}")
|
||
invalid_coord_count += 1
|
||
except (ValueError, TypeError):
|
||
print(f"⚠️ 地点: {location_name} - 坐标数值无效: {result}")
|
||
invalid_coord_count += 1
|
||
else:
|
||
print(f"❌ 地点: {location_name} - 未能获取有效的坐标信息")
|
||
failure_count += 1
|
||
|
||
# 如果不是最后一个地点,添加短暂延迟以避免过快请求
|
||
if i < len(location_names):
|
||
delay_time = random.uniform(0.5, 2) # 减少延迟时间到0.5-2秒
|
||
print(f"添加延迟: {delay_time:.2f}秒")
|
||
time.sleep(delay_time)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 批量坐标获取过程中发生错误: {str(e)}")
|
||
|
||
finally:
|
||
created_driver.close()
|
||
|
||
# 打印详细统计信息
|
||
print(f"\n{'=' * 60}")
|
||
print("批量坐标获取完成")
|
||
print(f"{'=' * 60}")
|
||
print(f"总查询地点数: {len(location_names)}")
|
||
print(f"成功获取: {success_count}")
|
||
print(f"获取失败: {failure_count}")
|
||
print(f"超时跳过: {skipped_count}")
|
||
print(f"无效坐标: {invalid_coord_count}")
|
||
print(f"获取到坐标数据的地点数: {len([c for c in all_coordinates.values() if c is not None])}")
|
||
|
||
# 计算实际成功率(只考虑成功获取的有效坐标)
|
||
actual_processed = success_count + failure_count + skipped_count + invalid_coord_count
|
||
if actual_processed > 0:
|
||
actual_success_rate = (success_count / actual_processed) * 100
|
||
print(f"实际成功率: {actual_success_rate:.1f}%")
|
||
else:
|
||
print("实际成功率: 0%")
|
||
|
||
print(f"{'=' * 60}")
|
||
|
||
# 返回结果字典
|
||
return all_coordinates
|
||
|
||
|
||
def create_cookie_export_tool():
|
||
"""
|
||
创建一个Cookie导出工具,帮助用户手动登录后导出Cookie
|
||
"""
|
||
print("\n===== Cookie导出工具 =====")
|
||
print("此工具将打开一个Chrome浏览器窗口,让您手动登录高德地图账号,")
|
||
print("登录成功后,按Enter键导出Cookie到文件。")
|
||
print("==========================")
|
||
|
||
# 设置Chrome浏览器参数
|
||
chrome_options = Options()
|
||
chrome_options.add_argument('--disable-gpu')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# 初始化WebDriver
|
||
driver = None
|
||
try:
|
||
# 尝试使用webdriver_manager
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
driver_path = ChromeDriverManager().install()
|
||
service = Service(driver_path)
|
||
except:
|
||
service = Service()
|
||
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
driver.maximize_window()
|
||
|
||
# 禁用自动化特征
|
||
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
||
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||
})
|
||
|
||
# 访问高德地图坐标拾取工具
|
||
url = 'https://lbs.amap.com/tools/picker'
|
||
print(f"\n正在打开高德地图坐标拾取工具: {url}")
|
||
import ssl
|
||
ssl._create_default_https_context = ssl._create_unverified_context
|
||
driver.get(url)
|
||
|
||
print("\n请在浏览器中手动登录高德地图账号。")
|
||
print("提示:建议使用微信扫码登录以避免验证码问题")
|
||
print("登录成功后,请按Enter键继续...")
|
||
input()
|
||
|
||
# 导出Cookie - 使用基于当前脚本所在目录的绝对路径
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
cookie_file = os.path.join(script_dir, '../Test/amap_cookies.json')
|
||
export_success = export_cookies_from_browser(driver, cookie_file)
|
||
|
||
if export_success:
|
||
print(f"\nCookie导出成功!文件保存为: {os.path.abspath(cookie_file)}")
|
||
print("您现在可以使用这个Cookie文件获取高精度坐标了。")
|
||
print("\n重要提示:")
|
||
print("1. 请妥善保管您的Cookie文件,不要分享给他人")
|
||
print("2. Cookie文件包含您的登录状态信息,请确保安全存储")
|
||
print("3. 当Cookie过期后,您需要重新运行此工具生成新的Cookie文件")
|
||
|
||
except Exception as e:
|
||
print(f"\n发生错误: {str(e)}")
|
||
finally:
|
||
if driver:
|
||
print("\n正在关闭浏览器...")
|
||
driver.quit()
|
||
|
||
|
||
def main():
|
||
"""
|
||
主函数,用于测试和运行高德地图坐标获取工具
|
||
"""
|
||
print("=== 高德地图高精度坐标获取工具 ====")
|
||
|
||
# 初始化参数
|
||
# 使用基于当前脚本所在目录的相对路径,确保在不同环境下都能正确访问
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
cookie_file = os.path.join(script_dir, '../Test/amap_cookies.json')
|
||
|
||
# 询问是否清空所有数据的经纬度
|
||
clear_coords = input("是否清空所有数据的经纬度,全新开始更新坐标?(y/n,默认n): ").strip().lower() == 'y'
|
||
if clear_coords:
|
||
print("正在清空所有学校的经纬度数据...")
|
||
try:
|
||
execute_update("UPDATE t_school SET longitude=0, latitude=0")
|
||
print("✅ 经纬度数据已清空")
|
||
except Exception as e:
|
||
print(f"❌ 清空经纬度数据失败: {str(e)}")
|
||
sys.exit(1)
|
||
|
||
# 从数据库查询需要更新坐标的学校(经纬度为0的学校)
|
||
print("正在从数据库查询需要更新坐标的学校...")
|
||
try:
|
||
results = find("SELECT org_name FROM t_school WHERE longitude=0 AND latitude=0 order by region_level1,region_level2,region_level3,school_type")
|
||
locations = [result['org_name'] for result in results]
|
||
print(f"✅ 找到 {len(locations)} 所需要更新坐标的学校")
|
||
|
||
if not locations:
|
||
print("所有学校都已有坐标数据,程序退出。")
|
||
sys.exit(0)
|
||
|
||
# 显示前10个学校作为预览
|
||
print("\n待处理学校列表预览:")
|
||
for i, loc in enumerate(locations[:10], 1):
|
||
print(f" {i}. {loc}")
|
||
if len(locations) > 10:
|
||
print(f" ... 等{len(locations) - 10}个学校")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询学校列表失败: {str(e)}")
|
||
sys.exit(1)
|
||
|
||
# 检查Cookie文件
|
||
if not os.path.exists(cookie_file):
|
||
print(f"\nCookie文件 '{cookie_file}' 不存在。")
|
||
if input("是否现在运行Cookie导出工具? (y/n): ").strip().lower() == 'y':
|
||
create_cookie_export_tool()
|
||
else:
|
||
print("\n程序退出。没有有效的Cookie文件,无法获取高精度坐标。")
|
||
sys.exit(1)
|
||
|
||
print(f"\n运行参数: 无头模式{'启用' if HEADLESS else '禁用'}")
|
||
|
||
# 逐个获取坐标并立即更新数据库
|
||
print("\n开始逐个获取坐标并更新数据库...")
|
||
success_updates = 0
|
||
total_locations = len(locations)
|
||
|
||
# 创建浏览器实例(复用)
|
||
chrome_options = Options()
|
||
if HEADLESS:
|
||
chrome_options.add_argument('--headless')
|
||
|
||
# 添加必要的配置参数
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_argument('-ignore-certificate-errors')
|
||
chrome_options.add_argument('-ignore-ssl-errors')
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# 随机User-Agent
|
||
user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 99)}.0.{random.randint(4000, 5000)}.{random.randint(100, 200)} Safari/537.36'
|
||
chrome_options.add_argument(f'user-agent={user_agent}')
|
||
|
||
# 初始化WebDriver
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
driver_path = ChromeDriverManager().install()
|
||
service = Service(driver_path)
|
||
print(f"成功使用webdriver_manager安装驱动: {driver_path}")
|
||
except:
|
||
print("使用系统默认的Chrome驱动")
|
||
service = Service()
|
||
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
||
# 添加到全局跟踪集合
|
||
if 'driver_instances' in globals():
|
||
driver_instances.add(driver)
|
||
|
||
# 最大化窗口(如果不是无头模式)
|
||
if not HEADLESS:
|
||
driver.maximize_window()
|
||
|
||
# 禁用自动化特征
|
||
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
||
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||
})
|
||
|
||
# 使用Cookie方式认证
|
||
cookies_loaded = load_cookies_to_browser(driver, cookie_file)
|
||
if not cookies_loaded:
|
||
print("Cookie认证失败,无法继续获取高精度坐标")
|
||
driver.quit()
|
||
return
|
||
|
||
# 访问高德地图坐标拾取工具
|
||
url = 'https://lbs.amap.com/tools/picker'
|
||
driver.get(url)
|
||
time.sleep(2) # 等待页面完全加载
|
||
|
||
update_sql = "UPDATE t_school SET longitude = :lng, latitude = :lat WHERE org_name = :org_name"
|
||
|
||
try:
|
||
# 逐个处理每个地点
|
||
for i, location_name in enumerate(locations, 1):
|
||
print(f"处理地点 {i}/{total_locations}: '{location_name}'")
|
||
|
||
# 获取单个地点的坐标
|
||
try:
|
||
coords = get_location_coordinates(
|
||
location_name,
|
||
headless=HEADLESS,
|
||
cookie_file=cookie_file,
|
||
driver=driver,
|
||
reuse_driver=True
|
||
)
|
||
|
||
# 如果成功获取到坐标,立即更新数据库
|
||
if coords:
|
||
try:
|
||
execute_update(update_sql, {
|
||
'lng': coords['lng'],
|
||
'lat': coords['lat'],
|
||
'org_name': location_name
|
||
})
|
||
success_updates += 1
|
||
print(f"✅ 地点: {location_name} - 坐标获取成功并更新数据库: {coords['lng']},{coords['lat']}")
|
||
except Exception as e:
|
||
print(f" ❌ 更新地点'{location_name}'失败: {str(e)}")
|
||
else:
|
||
print(f"❌ 地点: {location_name} - 未能获取有效的坐标信息")
|
||
|
||
# 添加短暂延迟以避免过快请求
|
||
if i < total_locations:
|
||
delay_time = random.uniform(0.5, 2)
|
||
print(f"添加延迟: {delay_time:.2f}秒")
|
||
time.sleep(delay_time)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 地点: {location_name} - 查询出错: {str(e)}")
|
||
|
||
# 添加延迟后继续处理下一个地点
|
||
if i < total_locations:
|
||
time.sleep(random.uniform(0.5, 2))
|
||
|
||
finally:
|
||
# 关闭浏览器
|
||
driver.close()
|
||
driver.quit()
|
||
|
||
print(f"\n✅ 数据库更新完成,成功{success_updates}/{total_locations}条记录")
|
||
|
||
print("\n✅ 程序执行完成!")
|
||
print("\n提示:如需重新批量更新所有学校坐标,下次运行程序时选择清空所有数据的经纬度即可。")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 全局变量,用于跟踪所有创建的WebDriver实例
|
||
driver_instances = weakref.WeakSet()
|
||
|
||
try:
|
||
print("=== 高德地图高精度坐标获取工具 ===")
|
||
print("正在初始化...")
|
||
# 直接运行主程序
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ 接收到键盘中断,退出程序")
|
||
except Exception as main_error:
|
||
print(f"\n❌ 主程序运行出错: {main_error}")
|
||
|
||
print("\n程序已退出")
|