Files
aiData/Json/Ref/T7_PaChongGaoDePatch.py
HuangHai ca23ebf606 'commit'
2026-01-12 08:09:32 +08:00

810 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import asyncio
import math
from typing import List, Tuple, Dict, Any
from DbKit.Db import Db
def is_point_in_polygon(point: Tuple[float, float], polygon: List[Tuple[float, float]]) -> bool:
"""
射线法判断点是否在多边形内
:param point: (经度, 纬度)
:param polygon: 多边形顶点坐标列表
:return: True表示在多边形内
"""
x, y = point
n = len(polygon)
inside = False
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
# 检查点是否在边的y范围内
if ((y1 > y) != (y2 > y)):
# 计算交点x坐标
x_intersect = (x2 - x1) * (y - y1) / (y2 - y1) + x1
if x_intersect > x:
inside = not inside
return inside
def load_geojson(json_path: str) -> List[List[Tuple[float, float]]]:
"""
从GeoJSON文件加载多边形数据支持Polygon和MultiPolygon
返回: 多边形列表,每个多边形是(经度, 纬度)元组的列表
"""
with open(json_path, 'r', encoding='utf-8') as f:
geo_data = json.load(f)
polygons = []
geometry_type = geo_data['features'][0]['geometry']['type']
coordinates = geo_data['features'][0]['geometry']['coordinates']
if geometry_type == 'Polygon':
polygons.append(coordinates[0])
elif geometry_type == 'MultiPolygon':
for polygon_coords in coordinates:
polygons.append(polygon_coords[0])
return polygons
# 计算点集的分布统计信息
def calculate_distribution_statistics(points):
"""
计算点集的分布统计信息
:param points: 点集 [(lon, lat), ...]
:return: 包含min_lon, max_lon, min_lat, max_lat, center_lon, center_lat, std_lon, std_lat的字典
"""
if not points:
return None
lons = [p[0] for p in points]
lats = [p[1] for p in points]
min_lon, max_lon = min(lons), max(lons)
min_lat, max_lat = min(lats), max(lats)
# 计算中心点
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
# 计算标准差(衡量分布的分散程度)
mean_lon = sum(lons) / len(lons)
mean_lat = sum(lats) / len(lats)
std_lon = math.sqrt(sum((lon - mean_lon) ** 2 for lon in lons) / len(lons))
std_lat = math.sqrt(sum((lat - mean_lat) ** 2 for lat in lats) / len(lats))
return {
"min_lon": min_lon,
"max_lon": max_lon,
"min_lat": min_lat,
"max_lat": max_lat,
"center_lon": center_lon,
"center_lat": center_lat,
"std_lon": std_lon,
"std_lat": std_lat
}
# 线性映射函数
def linear_map(value, from_min, from_max, to_min, to_max):
"""
将值从一个范围线性映射到另一个范围
:param value: 输入值
:param from_min: 原始范围最小值
:param from_max: 原始范围最大值
:param to_min: 目标范围最小值
:param to_max: 目标范围最大值
:return: 映射后的值
"""
# 处理除零情况
if from_max == from_min:
return (to_min + to_max) / 2
# 计算映射值
return (value - from_min) * (to_max - to_min) / (from_max - from_min) + to_min
# 计算点到线段的最近点
def closest_point_on_segment(point, line_start, line_end):
"""
计算点到线段的最近点
:param point: 点 [lon, lat]
:param line_start: 线段起点 [lon, lat]
:param line_end: 线段终点 [lon, lat]
:return: 线段上的最近点 [lon, lat]
"""
# 将经纬度转换为向量
a = [point[0] - line_start[0], point[1] - line_start[1]]
b = [line_end[0] - line_start[0], line_end[1] - line_start[1]]
# 计算投影参数t
t = max(0, min(1, (a[0] * b[0] + a[1] * b[1]) / (b[0] ** 2 + b[1] ** 2) if (b[0] ** 2 + b[1] ** 2) > 0 else 0))
# 计算投影点
return [line_start[0] + t * b[0], line_start[1] + t * b[1]]
# 计算两点之间的距离
def calculate_distance(point1, point2):
"""
计算两点之间的欧几里得距离
:param point1: 第一个点 [lon, lat]
:param point2: 第二个点 [lon, lat]
:return: 两点之间的距离
"""
return math.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)
# 调整点到多边形边界
def adjust_to_polygon_boundary(point, polygons):
"""
调整点到最近的多边形边界
:param point: 需要调整的点 [lon, lat]
:param polygons: 多边形列表
:return: 调整后的点 [lon, lat]
"""
min_distance = float('inf')
closest_point = point
for polygon in polygons:
for i in range(len(polygon)):
p1 = polygon[i]
p2 = polygon[(i + 1) % len(polygon)]
# 计算点到线段的最近点
proj_point = closest_point_on_segment(point, p1, p2)
distance = calculate_distance(point, proj_point)
if distance < min_distance:
min_distance = distance
closest_point = proj_point
return closest_point
# 将原始坐标映射到目标区域,保持相对位置关系
def map_coordinates(original_points, polygons):
"""
将原始坐标映射到目标区域,保持相对位置关系
:param original_points: 原始坐标列表 [(original_lon, original_lat), ...]
:param polygons: 目标区域的多边形列表
:return: 映射后的坐标列表 [(mapped_lon, mapped_lat), ...]
"""
if not original_points or not polygons:
return []
# 计算原始点集的分布统计
original_stats = calculate_distribution_statistics(original_points)
if not original_stats:
return []
# 获取目标区域的边界信息
all_target_points = []
for polygon in polygons:
all_target_points.extend(polygon)
if not all_target_points:
return []
target_lons = [p[0] for p in all_target_points]
target_lats = [p[1] for p in all_target_points]
target_min_lon, target_max_lon = min(target_lons), max(target_lons)
target_min_lat, target_max_lat = min(target_lats), max(target_lats)
# 收缩目标边界框,使生成的坐标更靠近县区中心
shrink_factor = 0.25 # 收缩25%,增大收缩比例使坐标更靠近中心
delta_lon = (target_max_lon - target_min_lon) * shrink_factor
delta_lat = (target_max_lat - target_min_lat) * shrink_factor
target_min_lon += delta_lon
target_max_lon -= delta_lon
target_min_lat += delta_lat
target_max_lat -= delta_lat
# 计算原始点集的边界
original_min_lon = original_stats["min_lon"]
original_max_lon = original_stats["max_lon"]
original_min_lat = original_stats["min_lat"]
original_max_lat = original_stats["max_lat"]
mapped_points = []
for original_lon, original_lat in original_points:
# 线性映射经度和纬度
mapped_lon = linear_map(original_lon, original_min_lon, original_max_lon, target_min_lon, target_max_lon)
mapped_lat = linear_map(original_lat, original_min_lat, original_max_lat, target_min_lat, target_max_lat)
# 检查映射后的点是否在目标区域内
if any(is_point_in_polygon((mapped_lon, mapped_lat), polygon) for polygon in polygons):
mapped_points.append((mapped_lon, mapped_lat))
else:
# 如果映射后的点不在目标区域内,调整到最近的边界
adjusted_lon, adjusted_lat = adjust_to_polygon_boundary([mapped_lon, mapped_lat], polygons)
mapped_points.append((adjusted_lon, adjusted_lat))
return mapped_points
# 传统的均匀分布点生成算法(作为回退方案)
def generate_uniform_points_fallback(polygons, n_points):
"""
在指定的多边形区域内生成均匀分布的点(回退方案)
:param polygons: 多边形列表,每个多边形是坐标点的列表
:param n_points: 需要生成的点数
:return: 生成的点列表 [(lon, lat), ...]
"""
if not polygons:
return []
# 计算整体边界框
all_lons = [p[0] for polygon in polygons for p in polygon]
all_lats = [p[1] for polygon in polygons for p in polygon]
min_lon, max_lon = min(all_lons), max(all_lons)
min_lat, max_lat = min(all_lats), max(all_lats)
# 收缩边界框,使生成的坐标更靠近县区中心
lon_margin = (max_lon - min_lon) * 0.25 # 收缩25%,增大收缩比例使坐标更靠近中心
lat_margin = (max_lat - min_lat) * 0.25
min_lon = min_lon + lon_margin
max_lon = max_lon - lon_margin
min_lat = min_lat + lat_margin
max_lat = max_lat - lat_margin
# 计算每个多边形的面积,用于按比例分配点数
def calculate_polygon_area(polygon):
# 使用Shoelace公式计算多边形面积
area = 0.0
n = len(polygon)
for i in range(n):
j = (i + 1) % n
area += polygon[i][0] * polygon[j][1]
area -= polygon[j][0] * polygon[i][1]
return abs(area) / 2.0
# 计算每个多边形的面积
polygon_areas = [calculate_polygon_area(polygon) for polygon in polygons]
total_area = sum(polygon_areas)
# 根据面积比例分配每个多边形需要生成的点数
if total_area > 0:
points_per_polygon = [max(1, int(area / total_area * n_points)) for area in polygon_areas]
# 调整点数确保总和等于n_points
points_per_polygon[-1] += n_points - sum(points_per_polygon)
else:
points_per_polygon = [n_points // len(polygons)] * len(polygons)
points_per_polygon[-1] += n_points - sum(points_per_polygon)
points = []
# 为每个多边形单独生成点
for polygon_index, (polygon, points_needed) in enumerate(zip(polygons, points_per_polygon)):
if points_needed <= 0:
continue
# 计算当前多边形的边界框
polygon_lons = [p[0] for p in polygon]
polygon_lats = [p[1] for p in polygon]
poly_min_lon, poly_max_lon = min(polygon_lons), max(polygon_lons)
poly_min_lat, poly_max_lat = min(polygon_lats), max(polygon_lats)
# 收缩当前多边形的边界框,使其更靠近中心
poly_lon_margin = (poly_max_lon - poly_min_lon) * 0.25 # 收缩25%,与全局收缩比例保持一致
poly_lat_margin = (poly_max_lat - poly_min_lat) * 0.25
poly_min_lon += poly_lon_margin
poly_max_lon -= poly_lon_margin
poly_min_lat += poly_lat_margin
poly_max_lat -= poly_lat_margin
# 为当前多边形生成足够的网格
cells_needed = max(points_needed * 2, 10) # 网格数量是点数的2倍确保分布均匀
cols = math.ceil(math.sqrt(cells_needed))
rows = math.ceil(cells_needed / cols)
cell_width = (poly_max_lon - poly_min_lon) / cols
cell_height = (poly_max_lat - poly_min_lat) / rows
# 记录哪些网格已经生成了点
filled_cells = []
max_attempts_per_cell = 50
# 按网格生成点
for attempt in range(rows * cols * 2): # 增加尝试次数
if len(points) >= n_points:
break
# 随机选择一个网格
i = random.randint(0, rows - 1)
j = random.randint(0, cols - 1)
cell_key = (i, j)
# 如果这个网格已经生成了足够的点,跳过
if filled_cells.count(cell_key) >= math.ceil(points_needed / (rows * cols)):
continue
# 当前网格范围
cell_min_lon = poly_min_lon + j * cell_width
cell_max_lon = poly_min_lon + (j + 1) * cell_width
cell_min_lat = poly_min_lat + i * cell_height
cell_max_lat = poly_min_lat + (i + 1) * cell_height
# 在网格内尝试生成有效点
for _ in range(max_attempts_per_cell):
lon = random.uniform(cell_min_lon, cell_max_lon)
lat = random.uniform(cell_min_lat, cell_max_lat)
# 检查点是否在当前多边形内
if is_point_in_polygon((lon, lat), polygon):
points.append((lon, lat))
filled_cells.append(cell_key)
break
# 如果当前多边形的点数不足,使用均匀分布补充
while len(points) < sum(points_per_polygon[:polygon_index+1]):
lon = random.uniform(poly_min_lon, poly_max_lon)
lat = random.uniform(poly_min_lat, poly_max_lat)
if is_point_in_polygon((lon, lat), polygon):
points.append((lon, lat))
# 如果总点数还是不足,使用全局均匀分布补充
while len(points) < n_points:
# 随机选择一个多边形
polygon = random.choice(polygons)
# 计算该多边形的边界框
polygon_lons = [p[0] for p in polygon]
polygon_lats = [p[1] for p in polygon]
poly_min_lon, poly_max_lon = min(polygon_lons), max(polygon_lons)
poly_min_lat, poly_max_lat = min(polygon_lats), max(polygon_lats)
# 在边界框内随机生成点
for _ in range(100): # 最多尝试100次
lon = random.uniform(poly_min_lon, poly_max_lon)
lat = random.uniform(poly_min_lat, poly_max_lat)
if is_point_in_polygon((lon, lat), polygon):
points.append((lon, lat))
break
return points[:n_points]
# 在指定的多边形区域内生成均匀分布的点
async def generate_uniform_points(polygons: List[List[Tuple[float, float]]], num_points: int, original_coords=None) -> List[Tuple[float, float]]:
"""
在指定的多边形区域内生成均匀分布的点
:param polygons: 多边形列表,每个多边形是坐标点的列表
:param num_points: 需要生成的点数
:param original_coords: 原始坐标列表,用于参考相对位置关系
:return: 生成的点列表 [(lon, lat), ...]
"""
if not polygons:
return []
# 如果提供了原始坐标,使用坐标映射算法
if original_coords and len(original_coords) > 1:
# 过滤掉None值的原始坐标
valid_original_coords = [(coord["original_longitude"], coord["original_latitude"])
for coord in original_coords
if coord["original_longitude"] is not None
and coord["original_latitude"] is not None]
if len(valid_original_coords) >= num_points:
# 如果原始坐标足够,直接使用映射算法
return map_coordinates(valid_original_coords[:num_points], polygons)
elif valid_original_coords: # 如果有部分原始坐标,先映射这些,再补充生成
mapped_points = map_coordinates(valid_original_coords, polygons)
# 计算还需要生成的点数
remaining_points = num_points - len(mapped_points)
if remaining_points > 0:
# 补充生成均匀分布的点
fallback_points = generate_uniform_points_fallback(polygons, remaining_points)
mapped_points.extend(fallback_points)
return mapped_points[:num_points]
# 如果没有原始坐标或原始坐标不足,使用传统的均匀分布点生成算法
return generate_uniform_points_fallback(polygons, num_points)
# 导入数据库操作类
db = Db()
# 一共有哪些县区
async def getAllArea():
sql = "select area_code,area_name from t_dm_area where area_level=3 order by area_code"
return await db.find(sql)
# 根据名称获取县区代码
async def getAreaCode(areaName):
sql = "select area_code from t_dm_area where area_level=3 and area_name=:area_name"
res = await db.findFirst(sql, {"area_name": areaName})
# 需要去掉最后的三个0
return str(res["area_code"])[:-3]
# 从t_school_xy表获取指定县区和学校类型的原始坐标数据
async def get_original_school_coordinates(region_level3: str, school_types: List[str]) -> List[Dict[str, Any]]:
"""
从t_school_xy表获取指定县区和学校类型的原始坐标数据
:param region_level3: 县区名称
:param school_types: 学校类型列表
:return: 包含学校信息和原始坐标的列表
"""
if not school_types:
return []
# 构建SQL查询获取原始坐标数据
placeholders = ",".join([f":type{i}" for i in range(len(school_types))])
sql = f"""
SELECT org_name, school_type, longitude AS original_longitude, latitude AS original_latitude,
region_level1, region_level2, region_level3, region_level4, region_level5
FROM t_school_xy
WHERE region_level3 = :region_level3 AND school_type IN ({placeholders})
"""
params = {"region_level3": region_level3}
# 添加每个学校类型的参数
for i, st in enumerate(school_types):
params[f"type{i}"] = st
# 获取原始坐标数据
original_coords = await db.find(sql, params)
# 转换坐标数据类型确保longitude和latitude是float类型
# 这可以避免与代码中其他float类型进行数学运算时出现类型不兼容错误
from decimal import Decimal
for coord in original_coords:
if isinstance(coord.get("original_longitude"), Decimal):
coord["original_longitude"] = float(coord["original_longitude"])
if isinstance(coord.get("original_latitude"), Decimal):
coord["original_latitude"] = float(coord["original_latitude"])
return original_coords
def get_school_types_by_education_level(education_level: str) -> List[str]:
"""
根据学段获取对应的学校类型列表
:param education_level: 学段名称,可选值:学前、小学、初中、高中
:return: 对应学校类型的列表用于SQL查询中的in子句
"""
# 定义学段到学校类型的映射关系
school_type_mapping = {
"学前": ["附设幼儿班", "幼儿园"],
"小学": ["小学教学点", "小学", "附设小学班"],
"初中": ["初级中学", "专门学校", "附设普通初中班", "附设职业初中班", "九年一贯制学校"],
"高中": ["附设普通高中班", "高级中学", "十二年一贯制学校", "完全中学"]
}
# 返回对应的学校类型列表,如果输入无效则返回空列表
return school_type_mapping.get(education_level, [])
# 获取指定县区+指定学校类型的学校,同时包含原始坐标信息
async def getSchoolList(region_level3, school_type):
# region_level3 是指县区的名称
# 检查是否传入的是学段名称
if school_type in ["学前", "小学", "初中", "高中"]:
school_types = get_school_types_by_education_level(school_type)
if not school_types:
return []
# 使用in子句查询多个学校类型
placeholders = ",".join([f":type{i}" for i in range(len(school_types))])
# 1. 从t_school表获取需要更新的学校列表
sql_school = f"""
select region_level1,region_level2,region_level3,region_level4,region_level5,school_type,org_name
from t_school
where region_level3=:region_level3 and school_type in ({placeholders})
order by region_level1,region_level2,region_level3,region_level4,region_level5,school_type
"""
params = {"region_level3": region_level3}
# 添加每个学校类型的参数
for i, st in enumerate(school_types):
params[f"type{i}"] = st
schools = await db.find(sql_school, params)
if not schools:
return []
# 2. 从t_school_xy表获取原始坐标数据
original_coords = await get_original_school_coordinates(region_level3, school_types)
# 3. 创建原始坐标字典,方便查找
original_coords_dict = {}
for coord in original_coords:
original_coords_dict[coord["org_name"]] = {
"original_longitude": coord["original_longitude"],
"original_latitude": coord["original_latitude"]
}
# 4. 合并原始坐标信息到学校列表
for school in schools:
org_name = school["org_name"]
if org_name in original_coords_dict:
school["original_longitude"] = original_coords_dict[org_name]["original_longitude"]
school["original_latitude"] = original_coords_dict[org_name]["original_latitude"]
else:
# 如果没有原始坐标设置为None
school["original_longitude"] = None
school["original_latitude"] = None
return schools
else:
# 原有的单个学校类型查询逻辑(简化版,不包含原始坐标合并)
sql = "select region_level1,region_level2,region_level3,region_level4,region_level5,school_type,org_name from t_school where region_level3=:region_level3 and school_type=:school_type order by region_level1,region_level2,region_level3,region_level4,region_level5,school_type"
schools = await db.find(sql, {"region_level3": region_level3, "school_type": school_type})
# 获取原始坐标并合并
original_coords = await get_original_school_coordinates(region_level3, [school_type])
original_coords_dict = {
coord["org_name"]: {
"original_longitude": coord["original_longitude"],
"original_latitude": coord["original_latitude"]
}
for coord in original_coords
}
for school in schools:
org_name = school["org_name"]
if org_name in original_coords_dict:
school["original_longitude"] = original_coords_dict[org_name]["original_longitude"]
school["original_latitude"] = original_coords_dict[org_name]["original_latitude"]
else:
school["original_longitude"] = None
school["original_latitude"] = None
return schools
# 获取指定县区下+指定学段的学校数量
async def getSchoolCount(region_level3, school_type):
# 检查是否传入的是学段名称
if school_type in ["学前", "小学", "初中", "高中"]:
school_types = get_school_types_by_education_level(school_type)
if not school_types:
return {"c": 0}
# 使用in子句查询多个学校类型
placeholders = ",".join([f":type{i}" for i in range(len(school_types))])
sql = f"select count(1) as c from t_school where region_level3=:region_level3 and school_type in ({placeholders})"
params = {"region_level3": region_level3}
# 添加每个学校类型的参数
for i, st in enumerate(school_types):
params[f"type{i}"] = st
return await db.findFirst(sql, params)
else:
# 原有的单个学校类型查询逻辑
sql = "select count(1) as c from t_school where region_level3=:region_level3 and school_type=:school_type"
return await db.findFirst(sql, {"region_level3": region_level3, "school_type": school_type})
# 更新学校经纬度
async def updateSchool(org_name, longitude, latitude):
sql = "update t_school set longitude=:longitude,latitude=:latitude where org_name=:org_name"
return await db.execute_update(sql, {"org_name": org_name, "longitude": longitude, "latitude": latitude})
# 重置所有学校坐标为0
async def reset_all_school_coordinates():
"""
将所有学校的经纬度坐标重置为0
"""
print("\n正在重置所有学校坐标...")
sql = "update t_school set longitude=0,latitude=0"
try:
result = await db.execute_update(sql, {})
print(f"✅ 成功重置 {result} 条学校记录的坐标")
return True
except Exception as e:
print(f"❌ 重置坐标失败: {str(e)}")
return False
async def get_schools_by_county_and_level(county_name, education_level):
"""
获取指定县区和学段的学校列表,包含当前坐标信息
只从t_school表获取数据不依赖t_school_xy表
"""
# 获取对应的学校类型列表
school_types = get_school_types_by_education_level(education_level)
if not school_types:
return []
# 使用in子句查询多个学校类型
placeholders = ",".join([f":type{i}" for i in range(len(school_types))])
# 从t_school表获取学校数据和当前坐标
sql = f"""
select region_level1, region_level2, region_level3, region_level4, region_level5,
school_type, org_name, longitude, latitude
from t_school
where region_level3=:county_name and school_type in ({placeholders})
order by region_level1, region_level2, region_level3, region_level4, region_level5, school_type
"""
params = {"county_name": county_name}
# 添加每个学校类型的参数
for i, st in enumerate(school_types):
params[f"type{i}"] = st
schools = await db.find(sql, params)
# 转换坐标数据类型确保longitude和latitude是float类型
from decimal import Decimal
for school in schools:
if isinstance(school.get("longitude"), Decimal):
school["longitude"] = float(school["longitude"])
if isinstance(school.get("latitude"), Decimal):
school["latitude"] = float(school["latitude"])
return schools
def optimize_school_distribution(schools, polygons):
"""
优化学校坐标分布
- 对于已经在多边形内部的坐标,保持不变
- 对于不在多边形内部的坐标,将其调整到多边形中
"""
if not schools:
return []
# 获取目标区域的边界信息
all_target_points = []
for polygon in polygons:
all_target_points.extend(polygon)
target_lons = [p[0] for p in all_target_points]
target_lats = [p[1] for p in all_target_points]
target_min_lon, target_max_lon = min(target_lons), max(target_lons)
target_min_lat, target_max_lat = min(target_lats), max(target_lats)
# 处理每个学校的坐标
optimized_coords = []
for school in schools:
lon, lat = school["longitude"], school["latitude"]
# 检查坐标是否已经在多边形内
if any(is_point_in_polygon((lon, lat), polygon) for polygon in polygons):
# 如果在多边形内,保持原坐标不变
optimized_coords.append((lon, lat))
else:
# 如果不在多边形内,将其调整到最近的多边形边界
adjusted_lon, adjusted_lat = adjust_to_polygon_boundary([lon, lat], polygons)
optimized_coords.append((adjusted_lon, adjusted_lat))
return optimized_coords
async def process_school_coordinates():
"""
优化t_school表中现有学校坐标分布
按县区和学段分组处理,美化坐标分布
注意这是一个补丁程序只修改现有坐标不重置为0
"""
print("\n===== 学校坐标分布优化补丁 =====")
print("正在开始优化现有学校坐标分布...")
try:
# 1. 获取所有县区信息
areas = await getAllArea()
print(f"\n✅ 成功获取 {len(areas)} 个县区信息")
# 2. 定义需要处理的学段
education_levels = ["学前", "小学", "初中", "高中"]
# 3. 统计信息
total_schools = 0
processed_schools = 0
failed_counties = 0
# 4. 遍历所有县区
for area in areas:
county_name = area["area_name"]
area_code = str(area["area_code"])[:-3] # 去掉最后三位0
print(f"\n🔄 正在处理: {county_name} (代码: {area_code})")
# 5. 构建GeoJSON文件路径
geojson_path = f"d:\\dsWork\\YunNanProject\\Init\\Json\\{area_code}.json"
try:
# 6. 加载县区边界数据
polygons = load_geojson(geojson_path)
print(f"✅ 成功加载 {county_name} 边界数据")
# 7. 遍历所有学段
for education_level in education_levels:
print(f" 📚 学段: {education_level}")
try:
# 8. 获取该县区该学段的学校列表及其现有坐标
schools = await get_schools_by_county_and_level(county_name, education_level)
if not schools:
print(f" ⏭️ 该学段无待处理学校")
continue
# 过滤掉没有坐标的学校
schools_with_coords = [s for s in schools if s["longitude"] is not None and s["latitude"] is not None]
if not schools_with_coords:
print(f" ⏭️ 该学段没有有效坐标的学校")
continue
print(f" 📋 找到 {len(schools_with_coords)} 所需要优化坐标的学校")
total_schools += len(schools_with_coords)
# 9. 优化学校坐标分布
optimized_coords = optimize_school_distribution(schools_with_coords, polygons)
# 10. 更新学校坐标
for i, (school, (lon, lat)) in enumerate(zip(schools_with_coords, optimized_coords), 1):
org_name = school["org_name"]
result = await updateSchool(org_name, lon, lat)
if result:
processed_schools += 1
print(f"{i}/{len(schools_with_coords)} 已优化: {org_name}")
else:
print(f"{i}/{len(schools_with_coords)} 优化失败: {org_name}")
except Exception as e:
print(f" ⚠️ 处理{education_level}学段时出错: {str(e)}")
continue
except FileNotFoundError:
print(f"❌ 未找到 {county_name} 的GeoJSON文件: {geojson_path}")
failed_counties += 1
except Exception as e:
print(f"❌ 处理{county_name}时出错: {str(e)}")
failed_counties += 1
continue
# 12. 输出统计结果
print("\n" + "="*50)
print("🎯 批量处理完成!")
print(f"📊 统计信息:")
print(f" - 总县区数: {len(areas)}")
print(f" - 成功处理县区数: {len(areas) - failed_counties}")
print(f" - 失败县区数: {failed_counties}")
print(f" - 待处理学校总数: {total_schools}")
print(f" - 成功更新学校数: {processed_schools}")
print("="*50)
except Exception as e:
print(f"\n❌ 批量处理过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
raise
finally:
# 确保在正常执行和异常情况下都关闭数据库连接
if 'db' in globals() and hasattr(db, 'engine') and db.engine:
try:
await db.engine.dispose()
print("\n✅ 数据库连接已成功关闭")
except Exception as dispose_error:
print(f"\n❌ 关闭数据库连接时出错: {str(dispose_error)}")
if __name__ == "__main__":
# 直接开始批量处理 - 为所有学校生成坐标
try:
asyncio.run(process_school_coordinates())
except KeyboardInterrupt:
print("\n⚠️ 程序被用户中断")
except Exception as e:
# 捕获并处理所有异常包括asyncio和SSL相关错误
print(f"\n❌ 程序执行出错: {str(e)}")
print(" 错误类型:", type(e).__name__)
# 仅在需要详细调试时显示完整堆栈
print("\n详细错误信息:")
import traceback
traceback.print_exc()
finally:
print("\n程序已退出")