268 lines
10 KiB
Python
268 lines
10 KiB
Python
"""
|
||
高德地图高精度坐标获取工具类
|
||
"""
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
||
class PaChongGaoDeKit:
|
||
def __init__(self, headless=True, cookie_file='amap_cookies.json'):
|
||
self.headless = headless
|
||
self.cookie_file = cookie_file
|
||
self.driver = None
|
||
|
||
# Ensure cookie file path is absolute if relative
|
||
if not os.path.isabs(self.cookie_file):
|
||
# Assuming relative to this file or current working directory
|
||
# For now, let's assume it's relative to the project root or current execution path
|
||
# Better to use absolute path if possible.
|
||
# If the file is in the same directory as this script:
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
potential_path = os.path.join(current_dir, self.cookie_file)
|
||
if os.path.exists(potential_path):
|
||
self.cookie_file = potential_path
|
||
else:
|
||
# Fallback to checking root/Config or similar if needed, or just leave as is
|
||
pass
|
||
|
||
def _init_driver(self):
|
||
if self.driver:
|
||
return
|
||
|
||
print(f"正在启动 Chrome 浏览器 (无头模式={self.headless})...")
|
||
chrome_options = Options()
|
||
if self.headless:
|
||
chrome_options.add_argument('--headless')
|
||
|
||
# 解决驱动与浏览器不匹配的常见配置
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
service = Service()
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from webdriver_manager.core.os_manager import ChromeType
|
||
# 使用 ChromeDriverManager 自动下载匹配版本的 ChromeDriver
|
||
driver_path = ChromeDriverManager().install()
|
||
service = Service(driver_path)
|
||
print(f"已成功加载 ChromeDriver: {driver_path}")
|
||
except ImportError:
|
||
print("提示:未安装 webdriver_manager 库,建议安装以获得更好的驱动管理体验:pip install webdriver-manager")
|
||
except Exception as e:
|
||
print(f"提示:自动管理驱动失败,尝试使用系统默认驱动。错误原因: {e}")
|
||
|
||
try:
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
||
if not self.headless:
|
||
self.driver.maximize_window()
|
||
|
||
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
||
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||
})
|
||
except Exception as e:
|
||
print(f"致命错误:无法启动 Chrome 浏览器。请确保已安装 Chrome 且版本与驱动匹配。详情: {e}")
|
||
raise e
|
||
|
||
def _load_cookies(self):
|
||
if not self.driver:
|
||
return False
|
||
|
||
if not os.path.exists(self.cookie_file):
|
||
print(f"Cookie file not found: {self.cookie_file}")
|
||
return False
|
||
|
||
try:
|
||
# Visit domain first
|
||
self.driver.get('https://www.amap.com')
|
||
time.sleep(1)
|
||
|
||
with open(self.cookie_file, 'r', encoding='utf-8') as f:
|
||
cookies = json.load(f)
|
||
|
||
for cookie in cookies:
|
||
if 'expiry' in cookie:
|
||
cookie['expiry'] = int(cookie['expiry'])
|
||
if 'sameSite' in cookie and cookie['sameSite'] not in ['Strict', 'Lax', 'None']:
|
||
del cookie['sameSite']
|
||
try:
|
||
self.driver.add_cookie(cookie)
|
||
except:
|
||
pass
|
||
return True
|
||
except Exception as e:
|
||
print(f"Failed to load cookies: {e}")
|
||
return False
|
||
|
||
def ensure_logged_in(self):
|
||
self._init_driver()
|
||
if not self._load_cookies():
|
||
print("Warning: Failed to load cookies, coordinate query might fail.")
|
||
|
||
# Navigate to picker tool
|
||
self.driver.get('https://lbs.amap.com/tools/picker')
|
||
time.sleep(2)
|
||
|
||
def parse_coordinates_info(self, text):
|
||
if not text:
|
||
return None
|
||
try:
|
||
clean_text = re.sub(r'[\u3000\t\n\r ]+', ',', text)
|
||
clean_text = re.sub(r'[,,]+', ',', clean_text).strip(',')
|
||
pattern = r'([7-9]?[0-9]\.[0-9]{1,}|1[0-3][0-9]\.[0-9]{1,})[,,]\s*([1-8]?[0-9]\.[0-9]{1,}|90\.[0-9]{1,})'
|
||
matches = re.finditer(pattern, clean_text)
|
||
|
||
valid_coordinates = []
|
||
for match in matches:
|
||
lng = match.group(1).strip()
|
||
lat = match.group(2).strip()
|
||
try:
|
||
lng_float = float(lng)
|
||
lat_float = float(lat)
|
||
if 70 <= lng_float <= 140 and 0 <= lat_float <= 60:
|
||
valid_coordinates.append((lng_float, lat_float))
|
||
except ValueError:
|
||
continue
|
||
|
||
if valid_coordinates:
|
||
# Return the first valid one
|
||
return valid_coordinates[0]
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def get_coordinate(self, address: str):
|
||
"""
|
||
Query coordinate for a single address.
|
||
Returns: (lng, lat) tuple or (None, None)
|
||
"""
|
||
if not address:
|
||
return None, None
|
||
|
||
if not self.driver:
|
||
self.ensure_logged_in()
|
||
|
||
try:
|
||
# Search input
|
||
search_input = WebDriverWait(self.driver, 10).until(
|
||
EC.presence_of_element_located((By.ID, 'txtSearch'))
|
||
)
|
||
search_input.clear()
|
||
search_input.send_keys(address)
|
||
|
||
# Click search
|
||
try:
|
||
search_button = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.CLASS_NAME, 'btn-search'))
|
||
)
|
||
search_button.click()
|
||
except:
|
||
search_input.send_keys(Keys.ENTER)
|
||
|
||
time.sleep(2) # Wait for result
|
||
|
||
# Get coordinate
|
||
coordinate_text = None
|
||
try:
|
||
coord_elem = WebDriverWait(self.driver, 10).until(
|
||
EC.presence_of_element_located((By.ID, 'txtCoordinate'))
|
||
)
|
||
val = coord_elem.get_attribute('value')
|
||
if val and val.strip():
|
||
coordinate_text = val.strip()
|
||
else:
|
||
coordinate_text = coord_elem.text.strip()
|
||
except:
|
||
pass
|
||
|
||
if coordinate_text:
|
||
result = self.parse_coordinates_info(coordinate_text)
|
||
if result:
|
||
return result
|
||
|
||
return None, None
|
||
|
||
except Exception as e:
|
||
print(f"Error querying address '{address}': {e}")
|
||
# Try to recover session if needed, or just return None
|
||
return None, None
|
||
|
||
def close(self):
|
||
if self.driver:
|
||
try:
|
||
self.driver.quit()
|
||
except:
|
||
pass
|
||
self.driver = None
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.close()
|
||
|
||
def login_and_save_cookies(self):
|
||
"""
|
||
打开浏览器进行手动登录,并将 Cookie 保存到文件。
|
||
该方法将阻塞,直到用户确认登录完成。
|
||
"""
|
||
# 强制切换为非无头模式以便手动登录
|
||
if self.headless:
|
||
print("切换至可视化模式进行登录...")
|
||
self.close()
|
||
self.headless = False
|
||
|
||
self._init_driver()
|
||
|
||
print("正在跳转至高德地图登录页面...")
|
||
# 跳转至通常需要登录的页面
|
||
self.driver.get('https://lbs.amap.com/tools/picker')
|
||
|
||
print("\n" + "="*60)
|
||
print("操作指引:")
|
||
print("1. 浏览器窗口已打开,请在窗口中手动完成登录。")
|
||
print("2. 您可以选择扫码登录或账号密码登录。")
|
||
print("3. 登录成功并跳转到控制台页面后,请回到此窗口。")
|
||
print("4. 在下方按 [回车键(Enter)],程序将自动抓取并保存 Cookie。")
|
||
print("="*60 + "\n")
|
||
|
||
input("等待登录完成,确认后请按 [Enter] 继续...")
|
||
|
||
# 保存 Cookie
|
||
try:
|
||
cookies = self.driver.get_cookies()
|
||
with open(self.cookie_file, 'w', encoding='utf-8') as f:
|
||
json.dump(cookies, f, indent=4)
|
||
print(f"成功:Cookie 已保存至: {self.cookie_file}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"错误:保存 Cookie 失败: {e}")
|
||
return False
|
||
|
||
if __name__ == "__main__":
|
||
# Test code
|
||
# Ensure you have 'amap_cookies.json' in the same directory or provide full path
|
||
# If no cookies, it might fail to load the picker page correctly if login is required.
|
||
|
||
kit = PaChongGaoDeKit(headless=False)
|
||
try:
|
||
addr = "北京市天安门"
|
||
lng, lat = kit.get_coordinate(addr)
|
||
print(f"{addr}: {lng}, {lat}")
|
||
finally:
|
||
kit.close()
|