268 lines
10 KiB
Python
268 lines
10 KiB
Python
|
|
"""
|
|||
|
|
高德地图高精度坐标获取工具类
|
|||
|
|
"""
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import random
|
|||
|
|
import re
|
|||
|
|
import time
|
|||
|
|
from selenium import webdriver
|
|||
|
|
from selenium.webdriver.chrome.options import Options
|
|||
|
|
from selenium.webdriver.chrome.service import Service
|
|||
|
|
from selenium.webdriver.common.by import By
|
|||
|
|
from selenium.webdriver.common.keys import Keys
|
|||
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|||
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|||
|
|
|
|||
|
|
class PaChongGaoDeKit:
|
|||
|
|
def __init__(self, headless=True, cookie_file='amap_cookies.json'):
|
|||
|
|
self.headless = headless
|
|||
|
|
self.cookie_file = cookie_file
|
|||
|
|
self.driver = None
|
|||
|
|
|
|||
|
|
# Ensure cookie file path is absolute if relative
|
|||
|
|
if not os.path.isabs(self.cookie_file):
|
|||
|
|
# Assuming relative to this file or current working directory
|
|||
|
|
# For now, let's assume it's relative to the project root or current execution path
|
|||
|
|
# Better to use absolute path if possible.
|
|||
|
|
# If the file is in the same directory as this script:
|
|||
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
|
potential_path = os.path.join(current_dir, self.cookie_file)
|
|||
|
|
if os.path.exists(potential_path):
|
|||
|
|
self.cookie_file = potential_path
|
|||
|
|
else:
|
|||
|
|
# Fallback to checking root/Config or similar if needed, or just leave as is
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def _init_driver(self):
|
|||
|
|
if self.driver:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f"正在启动 Chrome 浏览器 (无头模式={self.headless})...")
|
|||
|
|
chrome_options = Options()
|
|||
|
|
if self.headless:
|
|||
|
|
chrome_options.add_argument('--headless')
|
|||
|
|
|
|||
|
|
# 解决驱动与浏览器不匹配的常见配置
|
|||
|
|
chrome_options.add_argument('--no-sandbox')
|
|||
|
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
|||
|
|
chrome_options.add_argument('--window-size=1920,1080')
|
|||
|
|
chrome_options.add_argument('--lang=zh-CN')
|
|||
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|||
|
|
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
|||
|
|
chrome_options.add_experimental_option('useAutomationExtension', False)
|
|||
|
|
|
|||
|
|
service = Service()
|
|||
|
|
try:
|
|||
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|||
|
|
from webdriver_manager.core.os_manager import ChromeType
|
|||
|
|
# 使用 ChromeDriverManager 自动下载匹配版本的 ChromeDriver
|
|||
|
|
driver_path = ChromeDriverManager().install()
|
|||
|
|
service = Service(driver_path)
|
|||
|
|
print(f"已成功加载 ChromeDriver: {driver_path}")
|
|||
|
|
except ImportError:
|
|||
|
|
print("提示:未安装 webdriver_manager 库,建议安装以获得更好的驱动管理体验:pip install webdriver-manager")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"提示:自动管理驱动失败,尝试使用系统默认驱动。错误原因: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
|||
|
|
|
|||
|
|
if not self.headless:
|
|||
|
|
self.driver.maximize_window()
|
|||
|
|
|
|||
|
|
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
|||
|
|
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
|||
|
|
})
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"致命错误:无法启动 Chrome 浏览器。请确保已安装 Chrome 且版本与驱动匹配。详情: {e}")
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def _load_cookies(self):
|
|||
|
|
if not self.driver:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
if not os.path.exists(self.cookie_file):
|
|||
|
|
print(f"Cookie file not found: {self.cookie_file}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Visit domain first
|
|||
|
|
self.driver.get('https://www.amap.com')
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
with open(self.cookie_file, 'r', encoding='utf-8') as f:
|
|||
|
|
cookies = json.load(f)
|
|||
|
|
|
|||
|
|
for cookie in cookies:
|
|||
|
|
if 'expiry' in cookie:
|
|||
|
|
cookie['expiry'] = int(cookie['expiry'])
|
|||
|
|
if 'sameSite' in cookie and cookie['sameSite'] not in ['Strict', 'Lax', 'None']:
|
|||
|
|
del cookie['sameSite']
|
|||
|
|
try:
|
|||
|
|
self.driver.add_cookie(cookie)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Failed to load cookies: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def ensure_logged_in(self):
|
|||
|
|
self._init_driver()
|
|||
|
|
if not self._load_cookies():
|
|||
|
|
print("Warning: Failed to load cookies, coordinate query might fail.")
|
|||
|
|
|
|||
|
|
# Navigate to picker tool
|
|||
|
|
self.driver.get('https://lbs.amap.com/tools/picker')
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
def parse_coordinates_info(self, text):
|
|||
|
|
if not text:
|
|||
|
|
return None
|
|||
|
|
try:
|
|||
|
|
clean_text = re.sub(r'[\u3000\t\n\r ]+', ',', text)
|
|||
|
|
clean_text = re.sub(r'[,,]+', ',', clean_text).strip(',')
|
|||
|
|
pattern = r'([7-9]?[0-9]\.[0-9]{1,}|1[0-3][0-9]\.[0-9]{1,})[,,]\s*([1-8]?[0-9]\.[0-9]{1,}|90\.[0-9]{1,})'
|
|||
|
|
matches = re.finditer(pattern, clean_text)
|
|||
|
|
|
|||
|
|
valid_coordinates = []
|
|||
|
|
for match in matches:
|
|||
|
|
lng = match.group(1).strip()
|
|||
|
|
lat = match.group(2).strip()
|
|||
|
|
try:
|
|||
|
|
lng_float = float(lng)
|
|||
|
|
lat_float = float(lat)
|
|||
|
|
if 70 <= lng_float <= 140 and 0 <= lat_float <= 60:
|
|||
|
|
valid_coordinates.append((lng_float, lat_float))
|
|||
|
|
except ValueError:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if valid_coordinates:
|
|||
|
|
# Return the first valid one
|
|||
|
|
return valid_coordinates[0]
|
|||
|
|
return None
|
|||
|
|
except Exception:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_coordinate(self, address: str):
|
|||
|
|
"""
|
|||
|
|
Query coordinate for a single address.
|
|||
|
|
Returns: (lng, lat) tuple or (None, None)
|
|||
|
|
"""
|
|||
|
|
if not address:
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
if not self.driver:
|
|||
|
|
self.ensure_logged_in()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Search input
|
|||
|
|
search_input = WebDriverWait(self.driver, 10).until(
|
|||
|
|
EC.presence_of_element_located((By.ID, 'txtSearch'))
|
|||
|
|
)
|
|||
|
|
search_input.clear()
|
|||
|
|
search_input.send_keys(address)
|
|||
|
|
|
|||
|
|
# Click search
|
|||
|
|
try:
|
|||
|
|
search_button = WebDriverWait(self.driver, 5).until(
|
|||
|
|
EC.presence_of_element_located((By.CLASS_NAME, 'btn-search'))
|
|||
|
|
)
|
|||
|
|
search_button.click()
|
|||
|
|
except:
|
|||
|
|
search_input.send_keys(Keys.ENTER)
|
|||
|
|
|
|||
|
|
time.sleep(2) # Wait for result
|
|||
|
|
|
|||
|
|
# Get coordinate
|
|||
|
|
coordinate_text = None
|
|||
|
|
try:
|
|||
|
|
coord_elem = WebDriverWait(self.driver, 10).until(
|
|||
|
|
EC.presence_of_element_located((By.ID, 'txtCoordinate'))
|
|||
|
|
)
|
|||
|
|
val = coord_elem.get_attribute('value')
|
|||
|
|
if val and val.strip():
|
|||
|
|
coordinate_text = val.strip()
|
|||
|
|
else:
|
|||
|
|
coordinate_text = coord_elem.text.strip()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if coordinate_text:
|
|||
|
|
result = self.parse_coordinates_info(coordinate_text)
|
|||
|
|
if result:
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error querying address '{address}': {e}")
|
|||
|
|
# Try to recover session if needed, or just return None
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
def close(self):
|
|||
|
|
if self.driver:
|
|||
|
|
try:
|
|||
|
|
self.driver.quit()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
self.driver = None
|
|||
|
|
|
|||
|
|
def __enter__(self):
|
|||
|
|
return self
|
|||
|
|
|
|||
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|||
|
|
self.close()
|
|||
|
|
|
|||
|
|
def login_and_save_cookies(self):
|
|||
|
|
"""
|
|||
|
|
打开浏览器进行手动登录,并将 Cookie 保存到文件。
|
|||
|
|
该方法将阻塞,直到用户确认登录完成。
|
|||
|
|
"""
|
|||
|
|
# 强制切换为非无头模式以便手动登录
|
|||
|
|
if self.headless:
|
|||
|
|
print("切换至可视化模式进行登录...")
|
|||
|
|
self.close()
|
|||
|
|
self.headless = False
|
|||
|
|
|
|||
|
|
self._init_driver()
|
|||
|
|
|
|||
|
|
print("正在跳转至高德地图登录页面...")
|
|||
|
|
# 跳转至通常需要登录的页面
|
|||
|
|
self.driver.get('https://lbs.amap.com/tools/picker')
|
|||
|
|
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("操作指引:")
|
|||
|
|
print("1. 浏览器窗口已打开,请在窗口中手动完成登录。")
|
|||
|
|
print("2. 您可以选择扫码登录或账号密码登录。")
|
|||
|
|
print("3. 登录成功并跳转到控制台页面后,请回到此窗口。")
|
|||
|
|
print("4. 在下方按 [回车键(Enter)],程序将自动抓取并保存 Cookie。")
|
|||
|
|
print("="*60 + "\n")
|
|||
|
|
|
|||
|
|
input("等待登录完成,确认后请按 [Enter] 继续...")
|
|||
|
|
|
|||
|
|
# 保存 Cookie
|
|||
|
|
try:
|
|||
|
|
cookies = self.driver.get_cookies()
|
|||
|
|
with open(self.cookie_file, 'w', encoding='utf-8') as f:
|
|||
|
|
json.dump(cookies, f, indent=4)
|
|||
|
|
print(f"成功:Cookie 已保存至: {self.cookie_file}")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"错误:保存 Cookie 失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# Test code
|
|||
|
|
# Ensure you have 'amap_cookies.json' in the same directory or provide full path
|
|||
|
|
# If no cookies, it might fail to load the picker page correctly if login is required.
|
|||
|
|
|
|||
|
|
kit = PaChongGaoDeKit(headless=False)
|
|||
|
|
try:
|
|||
|
|
addr = "北京市天安门"
|
|||
|
|
lng, lat = kit.get_coordinate(addr)
|
|||
|
|
print(f"{addr}: {lng}, {lat}")
|
|||
|
|
finally:
|
|||
|
|
kit.close()
|