diff --git a/Apps/TeLaiDian/Config/Setting.py b/Apps/TeLaiDian/Config/Setting.py index 2611a60..7f5772c 100644 --- a/Apps/TeLaiDian/Config/Setting.py +++ b/Apps/TeLaiDian/Config/Setting.py @@ -10,8 +10,8 @@ DEBUG_BOX_COLOR = (0, 255, 0) DEBUG_BOX_THICKNESS = 3 # 等待时间配置 (秒) -WAIT_DETAIL_PAGE_LOAD = 2.0 -WAIT_BACK_TO_LIST = 1.0 +WAIT_DETAIL_PAGE_LOAD = 3.5 +WAIT_BACK_TO_LIST = 1.5 WAIT_AFTER_SCROLL = 2.5 # 坐标计算与安全防护 diff --git a/Apps/TeLaiDian/Crawler.py b/Apps/TeLaiDian/Crawler.py index 30ce41b..d0fe76e 100644 --- a/Apps/TeLaiDian/Crawler.py +++ b/Apps/TeLaiDian/Crawler.py @@ -1,10 +1,14 @@ # coding=utf-8 import asyncio -import logging +import asyncio import os import sys import time -from Apps.TeLaiDian.Kit import take_screenshot, get_image_content_md5, clean_station_name, setup_logger +import cv2 +from Apps.TeLaiDian.Kit import ( + take_screenshot, get_image_content_md5, clean_station_name, + setup_logger, detect_price_click_point_cv, read_image, save_image +) from Apps.TeLaiDian.ReadImageKit import ReadImageKit from Apps.TeLaiDian.Service import TeLaiDianService from Apps.TeLaiDian.Config.Setting import ( @@ -120,37 +124,92 @@ class TeLaiDianCrawler(BaseCrawler): d.swipe_ext("up", scale=DETAIL_SCROLL_DISTANCE_RATIO) await asyncio.sleep(1.5) - # 3. 点击“价格信息”按钮 (jgxx.jpg) - template_path = os.path.join(project_root, "Apps", "TeLaiDian", "Template", "jgxx.jpg") - logger.info(f"尝试点击价格详情按钮: {template_path}") + # 3. 点击“价格信息”区域 (识别橘红色价格 P0) + price_button_screen = take_screenshot(d, f"tld_before_price_click_{int(time.time())}.jpg") + logger.info("正在通过 CV 寻找橘红色价格区域 (P0)...") + + click_point = detect_price_click_point_cv(price_button_screen) + + # 调试:生成点击点标注图 + if click_point: + debug_flag_path = price_button_screen.replace(".jpg", "_click_debug.jpg") + img_debug = read_image(price_button_screen) + if img_debug is not None: + cv2.circle(img_debug, (click_point[0], click_point[1]), 20, (0, 0, 255), -1) # 红色大圆点 + cv2.line(img_debug, (click_point[0]-40, click_point[1]), (click_point[0]+40, click_point[1]), (255, 255, 255), 3) + cv2.line(img_debug, (click_point[0], click_point[1]-40), (click_point[0], click_point[1]+40), (255, 255, 255), 3) + save_image(debug_flag_path, img_debug) + logger.info(f"点击点调试图已保存: {debug_flag_path}") try: - # 使用 uiautomator2 的图像识别点击 - match = d.image.match(template_path) - if match: - logger.info(f"找到价格按钮,坐标: {match['point']}") - d.image.click(template_path) + if click_point: + logger.info(f"CV 成功定位价格区域,点击坐标: {click_point}") + d.click(click_point[0], click_point[1]) await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) else: - logger.warning("未找到价格按钮模板,尝试备选方案:直接点击屏幕下方区域") - # 备选方案:如果模板匹配失败,尝试点击屏幕中下方 - w, h = d.window_size() - d.click(w // 2, int(h * 0.8)) + logger.warning("CV 未能定位价格区域,尝试模板匹配兜底...") + template_path = os.path.join(project_root, "Apps", "TeLaiDian", "Template", "jgxx.jpg") + match = d.image.match(template_path) + if match: + d.image.click(template_path) + else: + logger.warning("模板匹配也失败,执行坐标兜底...") + w, h = d.window_size() + d.click(w // 2, int(h * 0.45)) # 滑动后价格通常在屏幕中上部 await asyncio.sleep(WAIT_DETAIL_PAGE_LOAD) except Exception as e: - logger.error(f"点击价格按钮失败: {e}") + logger.error(f"点击价格区域失败: {e}") + finally: + if os.path.exists(price_button_screen): os.remove(price_button_screen) - # 4. 截图并分析价格表 - price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}.jpg") - prices = await self.read_image_kit.analyze_detail_price(price_screen_path) + # 4. 循环滑动抓取完整分时电价 + all_prices = [] + last_price_md5 = None + price_page_count = 0 + max_price_pages = 3 # 分时电价通常不会超过3页 + + logger.info("开始循环滑动抓取完整分时电价...") + while price_page_count < max_price_pages: + price_screen_path = take_screenshot(d, f"tld_detail_price_{int(time.time())}.jpg") + + # 校验页面是否发生滚动变化 + curr_md5 = get_image_content_md5(price_screen_path, top_ratio=0.2, bottom_ratio=0.2) + if curr_md5 == last_price_md5: + logger.info("价格页面内容无变化,判定已触底") + if os.path.exists(price_screen_path): os.remove(price_screen_path) + break + last_price_md5 = curr_md5 + + logger.info(f"正在分析价格详情页第 {price_page_count + 1} 页: {price_screen_path}") + page_prices = await self.read_image_kit.analyze_detail_price(price_screen_path) + + if page_prices: + # 简单去重:根据时段合并 + for p in page_prices: + if p not in all_prices: + all_prices.append(p) + + # 向上滑动一点点,继续抓取 + d.swipe_ext("up", scale=0.6) + await asyncio.sleep(1.5) + price_page_count += 1 + + # 清理临时截图 + if os.path.exists(price_screen_path): os.remove(price_screen_path) + # 5. 保存数据 - if prices: + if all_prices: station_name_clean = clean_station_name(station_name) - logger.info(f"场站 {station_name_clean} 提取到 {len(prices)} 条价格信息,准备保存...") - await self.service.save_station_data(station_name_clean, address, prices) + # 对价格按时间排序 + try: + all_prices.sort(key=lambda x: x.get('start', '00:00')) + except: + pass + logger.info(f"✅ 场站 {station_name_clean} 共提取到 {len(all_prices)} 条价格信息,准备保存...") + await self.service.save_station_data(station_name_clean, address, all_prices) else: - logger.warning(f"未能从 {price_screen_path} 提取到价格信息") + logger.warning(f"❌ 未能提取到任何价格信息,请检查页面识别逻辑") # 清理临时截图 for p in [first_screen_path, price_screen_path]: diff --git a/Apps/TeLaiDian/Kit.py b/Apps/TeLaiDian/Kit.py index 38a3c3d..6377f22 100644 --- a/Apps/TeLaiDian/Kit.py +++ b/Apps/TeLaiDian/Kit.py @@ -93,6 +93,64 @@ def save_image(path, img): logger.error(f"Error saving image {path}: {e}") return False +def detect_price_click_point_cv(image_path): + """ + 使用 HSV 颜色过滤定位详情页的橘红色价格区域,返回最左侧区域的中心点击点 + """ + img = read_image(image_path) + if img is None: + return None + + h, w = img.shape[:2] + # 1. 转换为 HSV 空间 + hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) + + # 2. 橘红色的 HSV 范围 (适配特来电价格颜色) + lower_orange = np.array([0, 150, 150]) + upper_orange = np.array([20, 255, 255]) + mask = cv2.inRange(hsv, lower_orange, upper_orange) + + # 3. 对掩码进行膨胀,连接数字 + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (50, 20)) + dilated = cv2.dilate(mask, kernel) + + # 4. 寻找轮廓 + contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + detected_areas = [] + for cnt in contours: + x, y, cw, ch = cv2.boundingRect(cnt) + # 1. 过滤掉宽度过大(可能是横幅广告)或过小(可能是杂点)的区域 + # 2. 价格区域 P0 通常在屏幕的中部,且宽度约为屏幕的一半 + if 200 < y < h * 0.8 and 100 < cw < w * 0.6 and ch > 30: + detected_areas.append([x, y, x + cw, y + ch]) + + if not detected_areas: + # 备选:如果 HSV 失败,尝试通过轮廓大小寻找 + # 1.1556 这种大数字通常会有很明显的轮廓 + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (30, 10)) + closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) + contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + for cnt in contours: + x, y, cw, ch = cv2.boundingRect(cnt) + if 200 < y < h * 0.6 and 150 < cw < 300 and 50 < ch < 150: + detected_areas.append([x, y, x + cw, y + ch]) + + if not detected_areas: + return None + + # 5. 按 X 轴排序,取最左边的区域 (即用户确认的 P0) + # 但要排除掉可能在最左侧的导航栏返回按钮等小元素,所以前面加了宽度限制 + detected_areas.sort(key=lambda b: b[0]) + target = detected_areas[0] + + center_x = (target[0] + target[2]) // 2 + center_y = (target[1] + target[3]) // 2 + + return [center_x, center_y] + def detect_cards_cv(image_path, top_ratio=0.40, bottom_ratio=0.12): """ 使用计算机图形学 (OpenCV) 检测列表中的场站卡片。 diff --git a/Apps/TeLaiDian/ReadImageKit.py b/Apps/TeLaiDian/ReadImageKit.py index f4c86e5..6370dd5 100644 --- a/Apps/TeLaiDian/ReadImageKit.py +++ b/Apps/TeLaiDian/ReadImageKit.py @@ -22,27 +22,43 @@ class ReadImageKit: async def analyze_detail_price(self, image_path): """ - 分析详情页截图,提取电价信息 + 分析详情页截图,提取电价信息,包括优惠价、PLUS价和挂牌价 """ prompt = """ - 分析这张充电站详情页截图,提取**电价时段表**。 - 请仔细寻找包含“时段”、“电价”、“服务费”或“总价”的表格或列表。 + 分析这张充电站价格详情页截图,提取**分时电价表**。 + 对于每个时段,请识别并提取以下所有价格信息(如果存在): + 1. 优惠价 (通常是红色或加粗的大字,作为默认 price) + 2. PLUS会员价 (标有 "PLUS" 标签的价格) + 3. 挂牌价 (标有 "挂牌价" 标签的价格) + 4. 电费 (Base electricity price) + 5. 服务费 (Service fee) 请提取每个时段的: - 1. 开始时间 (HH:MM) - 2. 结束时间 (HH:MM) - 3. 总电价 (元/度,包含电费和服务费) + - start: 开始时间 (HH:MM) + - end: 结束时间 (HH:MM) + - price: 优惠价 (元/度) + - plus_price: PLUS会员价 (元/度) + - market_price: 挂牌价 (元/度) + - elec_price: 电费 (元/度) + - service_price: 服务费 (元/度) 输出格式为 JSON 数组: [ { - "start": "00:00", - "end": "08:00", - "price": 1.23 + "start": "16:00", + "end": "21:00", + "price": 1.3435, + "plus_price": 1.3035, + "market_price": 1.4435, + "elec_price": 0.9435, + "service_price": 0.4000 }, ... ] - 如果无法识别任何价格信息,请返回空数组 []。 + 注意: + - 如果某个字段缺失,请设为 null。 + - 确保 price 包含电费和服务费的总和。 + - 如果无法识别任何价格信息,请返回空数组 []。 """ try: res_text = await self.vlm.analyze_image(image_path, prompt) diff --git a/Apps/TeLaiDian/Service.py b/Apps/TeLaiDian/Service.py index 1c39e53..54fd1b1 100644 --- a/Apps/TeLaiDian/Service.py +++ b/Apps/TeLaiDian/Service.py @@ -52,18 +52,38 @@ class TeLaiDianService: now = datetime.now() # 将价格转换为 24 小时的 schedule 格式 (0-23) - hourly_schedule = [0.0] * 24 + # 每个小时存储一个包含多重价格的字典,以记录优惠价、PLUS价和挂牌价 + hourly_schedule = [None] * 24 for p in prices: try: - start_hour = int(p['start'].split(':')[0]) - end_hour = int(p['end'].split(':')[0]) - price = float(p['price']) + start_parts = p['start'].split(':') + end_parts = p['end'].split(':') + start_hour = int(start_parts[0]) + end_hour = int(end_parts[0]) - # 处理跨天的情况(如 23:00 - 01:00) + # 处理 00:00 作为结束时间的情况 (表示 24:00) + if end_hour == 0 and (int(end_parts[1]) == 0 if len(end_parts) > 1 else True): + if start_hour != 0: + end_hour = 24 + + # 提取各项价格 + price_data = { + "price": float(p.get('price')) if p.get('price') is not None else 0.0, + "plus_price": float(p.get('plus_price')) if p.get('plus_price') is not None else None, + "market_price": float(p.get('market_price')) if p.get('market_price') is not None else None, + "elec_price": float(p.get('elec_price')) if p.get('elec_price') is not None else None, + "service_price": float(p.get('service_price')) if p.get('service_price') is not None else None + } + + # 填充对应的小时槽位 curr = start_hour - while curr != end_hour: - hourly_schedule[curr] = price - curr = (curr + 1) % 24 + # 如果是跨天的,比如 23:00 - 01:00 + if end_hour < start_hour: + end_hour += 24 + + while curr < end_hour: + hourly_schedule[curr % 24] = price_data + curr += 1 except Exception as e: logger.error(f"解析价格时段失败: {p}, error: {e}") @@ -80,7 +100,7 @@ class TeLaiDianService: valid_start_time=now ) - # 2. 保存价格 + # 2. 保存价格计划 schedule_id = self.generate_id() await self.station_price_schedule_model.save( session=session, @@ -89,6 +109,24 @@ class TeLaiDianService: schedule_json=hourly_schedule, valid_start_time=now ) + + # 3. 保存当前状态快照 (包含当前小时的价格) + current_hour = now.hour + current_price_info = hourly_schedule[current_hour] or {} + + status_id = self.generate_id() + await self.station_status_model.save( + session=session, + id=status_id, + station_hash=station_hash, + total_piles=None, # 特来电暂时没抓取总桩数 + free_piles=None, + piles_detail_json=None, + current_price=current_price_info.get('price'), + pro_price=current_price_info.get('plus_price'), + market_price=current_price_info.get('market_price'), + valid_start_time=now + ) await session.commit() logger.info(f"成功保存场站数据: {station_name}") diff --git a/Apps/TeLaiDian/Template/jgxx.jpg b/Apps/TeLaiDian/Template/jgxx.jpg index 14832c0..04b9789 100644 Binary files a/Apps/TeLaiDian/Template/jgxx.jpg and b/Apps/TeLaiDian/Template/jgxx.jpg differ diff --git a/Apps/TelaiDian/Config/__pycache__/Setting.cpython-310.pyc b/Apps/TelaiDian/Config/__pycache__/Setting.cpython-310.pyc index a88fd32..ce7032e 100644 Binary files a/Apps/TelaiDian/Config/__pycache__/Setting.cpython-310.pyc and b/Apps/TelaiDian/Config/__pycache__/Setting.cpython-310.pyc differ diff --git a/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc b/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc index 759c8c6..d1b7153 100644 Binary files a/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/Crawler.cpython-310.pyc differ diff --git a/Apps/TelaiDian/__pycache__/Kit.cpython-310.pyc b/Apps/TelaiDian/__pycache__/Kit.cpython-310.pyc index e7627b9..9d6e545 100644 Binary files a/Apps/TelaiDian/__pycache__/Kit.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/Kit.cpython-310.pyc differ diff --git a/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc b/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc index 1a65d39..5b568e1 100644 Binary files a/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/ReadImageKit.cpython-310.pyc differ diff --git a/Apps/TelaiDian/__pycache__/Service.cpython-310.pyc b/Apps/TelaiDian/__pycache__/Service.cpython-310.pyc index 6fc9d14..c66c2a5 100644 Binary files a/Apps/TelaiDian/__pycache__/Service.cpython-310.pyc and b/Apps/TelaiDian/__pycache__/Service.cpython-310.pyc differ diff --git a/Model/StationStatus.py b/Model/StationStatus.py index c6aefda..72b6ad4 100644 --- a/Model/StationStatus.py +++ b/Model/StationStatus.py @@ -6,7 +6,7 @@ class StationStatus: def __init__(self): pass - async def save(self, session, id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price=None, parking_info=None, distance=None, valid_start_time=None): + async def save(self, session, id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price=None, market_price=None, parking_info=None, distance=None, valid_start_time=None): if valid_start_time is None: valid_start_time = datetime.now() @@ -20,7 +20,7 @@ class StationStatus: # 1. Check current record select_sql = """ - SELECT total_piles, free_piles, piles_detail_json, current_price, pro_price, parking_info, distance + SELECT total_piles, free_piles, piles_detail_json, current_price, pro_price, market_price, parking_info, distance FROM t_station_status_scd WHERE station_hash = :station_hash AND is_current = 1 """ @@ -29,42 +29,35 @@ class StationStatus: current_row = result.fetchone() except Exception as e: # Check if it's a "column not found" error - if "Unknown column 'parking_info'" in str(e) or "no such column: parking_info" in str(e): - # Handle schema evolution if needed, or just proceed assuming None for parking_info comparison + if "Unknown column 'market_price'" in str(e) or "no such column: market_price" in str(e): current_row = None - # Or re-raise if we want to fail hard, but let's try to be robust - # For now, if column missing, we might fail on INSERT later anyway. - # So re-raising or logging might be better. - # But since I cannot easily alter table here, I will proceed with code update - # and assume user/I run the ALTER script. + else: raise e - raise e if current_row: # Check if changed - # Note: current_row values might be Decimal, need to convert for comparison row_total = current_row.total_piles row_free = current_row.free_piles row_json = current_row.piles_detail_json row_price = current_row.current_price row_pro_price = current_row.pro_price - row_parking = getattr(current_row, 'parking_info', None) # Safely get if column exists + row_market_price = getattr(current_row, 'market_price', None) + row_parking = getattr(current_row, 'parking_info', None) row_distance = getattr(current_row, 'distance', None) - # Normalize row_json for comparison (handle key order differences) + # Normalize row_json for comparison if row_json: try: if isinstance(row_json, str): row_json_obj = json.loads(row_json) row_json = json.dumps(row_json_obj, ensure_ascii=False, sort_keys=True) except Exception: - pass # Keep original if parse fails + pass - # Convert price to float if it is Decimal, for comparison - if row_price is not None: - row_price = float(row_price) - if row_pro_price is not None: - row_pro_price = float(row_pro_price) + # Convert prices for comparison + if row_price is not None: row_price = float(row_price) + if row_pro_price is not None: row_pro_price = float(row_pro_price) + if row_market_price is not None: row_market_price = float(row_market_price) # Simple comparison is_same = ( @@ -72,13 +65,13 @@ class StationStatus: row_free == free_piles and row_price == current_price and row_pro_price == pro_price and + row_market_price == market_price and row_json == piles_json_str and row_parking == parking_info and row_distance == distance ) if is_same: - # No change, skip insert return # Expire old record @@ -95,9 +88,9 @@ class StationStatus: # 2. Insert new record sql = """ INSERT INTO t_station_status_scd - (id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price, parking_info, distance, valid_start_time, is_current) + (id, station_hash, total_piles, free_piles, piles_detail_json, current_price, pro_price, market_price, parking_info, distance, valid_start_time, is_current) VALUES - (:id, :station_hash, :total_piles, :free_piles, :piles_detail_json, :current_price, :pro_price, :parking_info, :distance, :valid_start_time, 1) + (:id, :station_hash, :total_piles, :free_piles, :piles_detail_json, :current_price, :pro_price, :market_price, :parking_info, :distance, :valid_start_time, 1) """ await session.execute(text(sql), { "id": id, @@ -107,6 +100,7 @@ class StationStatus: "piles_detail_json": piles_json_str, "current_price": current_price, "pro_price": pro_price, + "market_price": market_price, "parking_info": parking_info, "distance": distance, "valid_start_time": valid_start_time diff --git a/Model/__pycache__/StationStatus.cpython-310.pyc b/Model/__pycache__/StationStatus.cpython-310.pyc index b682d52..4b6ce52 100644 Binary files a/Model/__pycache__/StationStatus.cpython-310.pyc and b/Model/__pycache__/StationStatus.cpython-310.pyc differ diff --git a/Tools/Sql/doris_ddl.sql b/Tools/Sql/doris_ddl.sql index 96a042a..7a6deca 100644 --- a/Tools/Sql/doris_ddl.sql +++ b/Tools/Sql/doris_ddl.sql @@ -49,8 +49,9 @@ CREATE TABLE IF NOT EXISTS t_station_status_scd ( `total_piles` INT COMMENT '总桩数', `free_piles` INT COMMENT '空闲桩数', `piles_detail_json` TEXT COMMENT '详细桩信息 (JSON格式)', - `current_price` DECIMAL(10, 4) COMMENT '当前价格快照', - `pro_price` DECIMAL(10, 4) COMMENT 'PRO会员专享价快照', + `current_price` DECIMAL(10, 4) COMMENT '当前价格快照 (优惠价)', + `pro_price` DECIMAL(10, 4) COMMENT 'PRO/PLUS会员专享价快照', + `market_price` DECIMAL(10, 4) COMMENT '挂牌价快照', `parking_info` VARCHAR(500) COMMENT '停车收费信息', `distance` VARCHAR(50) COMMENT '距离信息 (例如 5.3km)', `valid_end_time` DATETIME NOT NULL DEFAULT '9999-12-31 23:59:59' COMMENT '记录生效结束时间',