This commit is contained in:
HuangHai
2026-01-20 08:09:13 +08:00
parent b0b4533f57
commit 34501faafb
20 changed files with 2360 additions and 859 deletions

View File

@@ -1,29 +1,11 @@
import asyncio
import logging
import os
import uuid
import json
import redis
try:
from Config.Config import (
REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD,
REDIS_DECODE_RESPONSES, REDIS_MAX_CONNECTIONS
)
except ModuleNotFoundError:
import importlib.util
import os
_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_config_path = os.path.join(_root_dir, "Config", "Config.py")
_spec = importlib.util.spec_from_file_location("project_config_fallback", _config_path)
_cfg = importlib.util.module_from_spec(_spec)
assert _spec.loader is not None
_spec.loader.exec_module(_cfg)
REDIS_HOST = _cfg.REDIS_HOST
REDIS_PORT = _cfg.REDIS_PORT
REDIS_DB = _cfg.REDIS_DB
REDIS_PASSWORD = getattr(_cfg, "REDIS_PASSWORD", None)
REDIS_DECODE_RESPONSES = _cfg.REDIS_DECODE_RESPONSES
REDIS_MAX_CONNECTIONS = _cfg.REDIS_MAX_CONNECTIONS
from Config.Config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_DECODE_RESPONSES
# 创建logger实例
logger = logging.getLogger(__name__)
@@ -41,14 +23,13 @@ class RedisKit:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
async def _ensure_pool(cls):
async def _ensure_pool(self):
"""
确保Redis连接池已创建
"""
if cls._redis_pool is None:
async with cls._lock:
if cls._redis_pool is None:
if RedisKit._redis_pool is None:
async with RedisKit._lock:
if RedisKit._redis_pool is None:
try:
# 创建同步Redis连接池
sync_pool = redis.ConnectionPool(
@@ -56,32 +37,30 @@ class RedisKit:
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=REDIS_DECODE_RESPONSES,
max_connections=REDIS_MAX_CONNECTIONS
decode_responses=REDIS_DECODE_RESPONSES
)
# 创建Redis连接实例
cls._redis_pool = redis.Redis(
RedisKit._redis_pool = redis.Redis(
connection_pool=sync_pool,
encoding='utf-8',
decode_responses=REDIS_DECODE_RESPONSES
)
# 测试连接
await asyncio.to_thread(cls._redis_pool.ping)
await asyncio.to_thread(RedisKit._redis_pool.ping)
logger.info("Redis连接池创建成功")
except Exception as e:
logger.error(f"Redis连接池创建失败: {e}")
raise
@classmethod
async def get_connection(cls):
async def get_connection(self):
"""
获取Redis连接实例
Returns:
redis.Redis: Redis连接实例
"""
await cls._ensure_pool()
return cls._redis_pool
await self._ensure_pool()
return RedisKit._redis_pool
async def get_data(self, key):
"""
@@ -119,741 +98,16 @@ class RedisKit:
return False
async def set_data(self, key, value, expire=None):
"""
异步向Redis中写入数据
Args:
key (str): Redis键名
value (str): 要存储的值
expire (int, optional): 过期时间(秒),默认不过期
Returns:
bool: 写入成功返回True失败返回False
"""
try:
await self._ensure_pool()
if expire:
result = await asyncio.to_thread(RedisKit._redis_pool.setex, key, expire, value)
await asyncio.to_thread(RedisKit._redis_pool.set, key, value, ex=expire)
else:
result = await asyncio.to_thread(RedisKit._redis_pool.set, key, value)
return result
except Exception as e:
logger.error(f"向Redis写入数据失败(key={key}): {e}")
return False
async def delete_data(self, key):
"""
异步从Redis中删除数据
Args:
key (str): Redis键名
Returns:
bool: 删除成功返回True失败返回False
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.delete, key)
return result > 0 # delete返回删除的键数量大于0表示删除成功
except Exception as e:
logger.error(f"从Redis删除数据失败(key={key}): {e}")
return False
async def keys(self, pattern='*'):
"""
异步获取匹配模式的所有键
Args:
pattern (str): 匹配模式
Returns:
list: 匹配的键列表
"""
try:
await self._ensure_pool()
return await asyncio.to_thread(RedisKit._redis_pool.keys, pattern)
except Exception as e:
logger.error(f"获取Redis键列表失败(pattern={pattern}): {e}")
return []
async def delete(self, *names):
"""
异步删除一个或多个键
Args:
*names: 要删除的键名
Returns:
int: 删除的键数量
"""
try:
await self._ensure_pool()
if not names:
return 0
return await asyncio.to_thread(RedisKit._redis_pool.delete, *names)
except Exception as e:
logger.error(f"删除Redis键失败: {e}")
return 0
async def publish(self, channel, message):
"""
异步发布消息到Redis频道
Args:
channel (str): Redis频道名称
message (str): 要发布的消息
Returns:
int: 接收消息的订阅者数量
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.publish, channel, message)
return result
except Exception as e:
logger.error(f"Redis发布消息失败(channel={channel}): {e}")
return 0
async def subscribe(self, channel):
"""
异步订阅Redis频道
Args:
channel (str): Redis频道名称
Returns:
redis.client.PubSub: PubSub对象用于接收消息
"""
try:
await self._ensure_pool()
pubsub = RedisKit._redis_pool.pubsub()
await asyncio.to_thread(pubsub.subscribe, channel)
return pubsub
except Exception as e:
logger.error(f"Redis订阅频道失败(channel={channel}): {e}")
return None
async def save_user_token(self, person_id, platform, token, expire=86400):
"""
异步保存用户的token到Redis
Args:
person_id (str): 用户ID
platform (str): 平台
token (str): 生成的token
expire (int): 过期时间默认24小时
Returns:
bool: 保存成功返回True失败返回False
"""
try:
# 使用固定格式的keyuser_token:{person_id}:{platform}
key = f"user_token:{person_id}:{platform}"
return await self.set_data(key, token, expire)
except Exception as e:
logger.error(f"保存用户token失败(person_id={person_id}, platform={platform}): {e}")
return False
async def get_user_token(self, person_id, platform):
"""
异步获取用户在Redis中存储的token
Args:
person_id (str): 用户ID
platform (str): 平台
Returns:
str: 用户的token如果不存在或过期则返回None
"""
try:
key = f"user_token:{person_id}:{platform}"
return await self.get_data(key)
except Exception as e:
logger.error(f"获取用户token失败(person_id={person_id}, platform={platform}): {e}")
return None
async def validate_user_token(self, person_id, platform, token):
"""
异步验证用户的token是否与Redis中存储的一致
Args:
person_id (str): 用户ID
platform (str): 平台、终端
token (str): 待验证的token
Returns:
bool: token有效返回True无效返回False
"""
try:
stored_token = await self.get_user_token(person_id, platform)
# 如果没有存储的token或者存储的token与当前token不一致则token无效
return stored_token is not None and stored_token == token
except Exception as e:
logger.error(f"验证用户token失败(person_id={person_id}, platform={platform}): {e}")
return False
async def delete_user_token(self, person_id, platform=None):
"""
异步删除用户在Redis中存储的token
Args:
person_id (str): 用户ID
platform (str, optional): 平台如果为None则删除所有平台的token
Returns:
bool: 删除成功返回True失败返回False
"""
try:
await self._ensure_pool()
if platform:
# 删除特定平台的token
key = f"user_token:{person_id}:{platform}"
return await self.delete_data(key)
else:
# 删除所有平台的token使用模式匹配
pattern = f"user_token:{person_id}:*"
cursor = b'0'
deleted_count = 0
# 使用scan命令查找所有匹配的键
while cursor:
cursor, keys = await asyncio.to_thread(
RedisKit._redis_pool.scan,
cursor=cursor,
match=pattern,
count=100
)
if keys:
deleted = await asyncio.to_thread(
RedisKit._redis_pool.delete, *keys
)
deleted_count += deleted
return deleted_count > 0
except Exception as e:
logger.error(f"删除用户token失败(person_id={person_id}): {e}")
return False
async def list_push(self, key, value, expire=None):
"""
异步向Redis List中推入数据LPUSH
Args:
key (str): Redis键名
value (str): 要推入的值
expire (int, optional): 过期时间(秒)如果设置则同时为key设置过期时间
Returns:
int: 推入后list的长度失败返回-1
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.lpush, key, value)
# 如果设置了过期时间为key设置TTL
if expire and result > 0:
await asyncio.to_thread(RedisKit._redis_pool.expire, key, expire)
return result
except Exception as e:
logger.error(f"向Redis List推入数据失败(key={key}): {e}")
return -1
async def list_pop(self, key):
"""
异步从Redis List中弹出数据RPOP
Args:
key (str): Redis键名
Returns:
str: 弹出的值如果list为空或失败返回None
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.rpop, key)
return result
except Exception as e:
logger.error(f"从Redis List弹出数据失败(key={key}): {e}")
return None
async def list_length(self, key):
"""
异步获取Redis List的长度
Args:
key (str): Redis键名
Returns:
int: list的长度如果失败返回-1
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.llen, key)
return result
except Exception as e:
logger.error(f"获取Redis List长度失败(key={key}): {e}")
return -1
async def list_get_all(self, key):
"""
异步获取Redis List中的所有数据LRANGE 0 -1
Args:
key (str): Redis键名
Returns:
list: list中的所有数据如果失败返回空列表
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.lrange, key, 0, -1)
return result if result else []
except Exception as e:
logger.error(f"获取Redis List所有数据失败(key={key}): {e}")
return []
async def list_clear(self, key):
"""
异步清空Redis List删除key
Args:
key (str): Redis键名
Returns:
bool: 清空成功返回True失败返回False
"""
try:
return await self.delete_data(key)
except Exception as e:
logger.error(f"清空Redis List失败(key={key}): {e}")
return False
async def set_is_member(self, key, value):
"""
异步检查值是否在Redis集合中
Args:
key (str): Redis键名
value: 要检查的值
Returns:
bool: 值在集合中返回True不在或失败返回False
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.sismember, key, value)
return bool(result)
except Exception as e:
logger.error(f"检查值是否在Redis集合中失败(key={key}): {e}")
return False
async def set_add(self, key, value):
"""
异步向Redis集合中添加值
Args:
key (str): Redis键名
value: 要添加的值
Returns:
bool: 添加成功返回True失败返回False
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.sadd, key, value)
return result > 0 # sadd返回添加的元素数量大于0表示添加成功
except Exception as e:
logger.error(f"向Redis集合添加值失败(key={key}): {e}")
return False
async def set_remove(self, key, value):
"""
异步从Redis集合中移除值
Args:
key (str): Redis键名
value: 要移除的值
Returns:
bool: 移除成功返回True失败返回False
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.srem, key, value)
return result > 0 # srem返回移除的元素数量大于0表示移除成功
except Exception as e:
logger.error(f"从Redis集合移除值失败(key={key}): {e}")
return False
async def set_cardinality(self, key):
"""
异步获取Redis集合的成员数量
Args:
key (str): Redis键名
Returns:
int: 集合的成员数量如果失败返回0
"""
try:
await self._ensure_pool()
result = await asyncio.to_thread(RedisKit._redis_pool.scard, key)
return result
except Exception as e:
logger.error(f"获取Redis集合成员数量失败(key={key}): {e}")
return 0
# 分布式锁相关方法
async def acquire_lock(self, lock_name, timeout=300):
"""
获取分布式锁
Args:
lock_name (str): 锁的名称
timeout (int): 锁的超时时间(秒)默认300秒
Returns:
str: 锁ID如果获取失败返回None
"""
try:
await self._ensure_pool()
# 生成唯一锁ID
lock_id = str(uuid.uuid4())
key = f"lock:{lock_name}"
# 使用Redis原子操作设置锁
result = await asyncio.to_thread(
RedisKit._redis_pool.setnx, key, lock_id
)
if result:
# 设置锁的过期时间
await asyncio.to_thread(
RedisKit._redis_pool.expire, key, timeout
)
logger.info(f"获取分布式锁成功: {lock_name}, lock_id: {lock_id}")
return lock_id
else:
logger.info(f"获取分布式锁失败: {lock_name} (锁已存在)")
return None
except Exception as e:
logger.error(f"获取分布式锁失败: {lock_name}, 错误: {e}")
return None
async def release_lock(self, lock_name, lock_id):
"""
释放分布式锁
Args:
lock_name (str): 锁的名称
lock_id (str): 锁ID
Returns:
bool: 是否成功释放锁
"""
try:
await self._ensure_pool()
key = f"lock:{lock_name}"
# 使用Lua脚本原子性检查并释放锁
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = await asyncio.to_thread(
RedisKit._redis_pool.eval, lua_script, 1, key, lock_id
)
if result == 1:
logger.info(f"释放分布式锁成功: {lock_name}, lock_id: {lock_id}")
return True
else:
logger.info(f"释放分布式锁失败: {lock_name}, lock_id: {lock_id} (锁不存在或已被其他进程持有)")
return False
except Exception as e:
logger.error(f"释放分布式锁失败: {lock_name}, 错误: {e}")
return False
async def renew_lock(self, lock_name, lock_id, timeout=300):
"""
续约分布式锁
Args:
lock_name (str): 锁的名称
lock_id (str): 锁ID
timeout (int): 续约时间(秒)默认300秒
Returns:
bool: 是否成功续约
"""
try:
await self._ensure_pool()
key = f"lock:{lock_name}"
# 使用Lua脚本原子性检查并续约锁
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
"""
result = await asyncio.to_thread(
RedisKit._redis_pool.eval, lua_script, 1, key, lock_id, timeout
)
if result == 1:
logger.info(f"续约分布式锁成功: {lock_name}, lock_id: {lock_id}")
return True
else:
logger.info(f"续约分布式锁失败: {lock_name}, lock_id: {lock_id} (锁不存在或已被其他进程持有)")
return False
except Exception as e:
logger.error(f"续约分布式锁失败: {lock_name}, 错误: {e}")
return False
# 任务进度监控相关方法
async def set_task_progress(self, task_id, progress, status="processing", message=""):
"""
设置任务进度
Args:
task_id (str): 任务ID
progress (int): 进度百分比(0-100)
status (str): 任务状态(waiting, processing, completed, failed)
message (str): 任务消息
Returns:
bool: 设置成功返回True失败返回False
"""
try:
await self._ensure_pool()
key = f"task:progress:{task_id}"
# 构造进度数据
progress_data = {
"task_id": task_id,
"progress": min(100, max(0, progress)), # 确保进度在0-100之间
"status": status,
"message": message,
"update_time": asyncio.get_event_loop().time()
}
# 序列化进度数据
progress_json = json.dumps(progress_data, ensure_ascii=False)
# 保存到Redis
await asyncio.to_thread(
RedisKit._redis_pool.set, key, progress_json
)
# 设置过期时间为24小时
await asyncio.to_thread(
RedisKit._redis_pool.expire, key, 86400
)
logger.info(f"设置任务进度成功: {task_id}, 进度: {progress}%, 状态: {status}")
await asyncio.to_thread(RedisKit._redis_pool.set, key, value)
return True
except Exception as e:
logger.error(f"设置任务进度失败: {task_id}, 错误: {e}")
logger.error(f"保存数据到Redis失败(key={key}): {e}")
return False
async def get_task_progress(self, task_id):
"""
获取任务进度
Args:
task_id (str): 任务ID
Returns:
dict: 任务进度数据如果不存在返回None
"""
try:
await self._ensure_pool()
key = f"task:progress:{task_id}"
# 从Redis获取进度数据
progress_json = await asyncio.to_thread(
RedisKit._redis_pool.get, key
)
if progress_json:
return json.loads(progress_json)
else:
return None
except Exception as e:
logger.error(f"获取任务进度失败: {task_id}, 错误: {e}")
return None
async def complete_task(self, task_id, success=True, message=""):
"""
完成任务
Args:
task_id (str): 任务ID
success (bool): 任务是否成功完成
message (str): 完成消息
Returns:
bool: 设置成功返回True失败返回False
"""
status = "completed" if success else "failed"
progress = 100 if success else 0
return await self.set_task_progress(task_id, progress, status, message)
async def delete_task_progress(self, task_id):
"""
删除任务进度记录
Args:
task_id (str): 任务ID
Returns:
bool: 删除成功返回True失败返回False
"""
try:
await self._ensure_pool()
key = f"task:progress:{task_id}"
# 从Redis删除进度记录
result = await asyncio.to_thread(
RedisKit._redis_pool.delete, key
)
logger.info(f"删除任务进度记录成功: {task_id}")
return result > 0
except Exception as e:
logger.error(f"删除任务进度记录失败: {task_id}, 错误: {e}")
return False
@classmethod
async def close(cls):
"""
关闭Redis连接池
"""
if cls._redis_pool:
# 同步Redis连接池不需要等待关闭
cls._redis_pool.connection_pool.disconnect()
cls._redis_pool = None
logger.info("Redis连接池已关闭")
# 创建全局实例
redisKit = RedisKit()
async def main():
"""
测试异步Redis操作的main函数
"""
print("开始测试异步Redis工具类...")
try:
# 测试基本数据操作
print("\n=== 测试基本数据操作 ===")
# 测试设置和获取数据
test_key = "test_key"
test_value = "test_value_123"
result = await redisKit.set_data(test_key, test_value, expire=60)
print(f"设置数据结果: {result}")
get_value = await redisKit.get_data(test_key)
print(f"获取数据结果: {get_value}")
# 测试用户token操作
print("\n=== 测试用户token操作 ===")
# 保存用户token
person_id = "user123"
platform = "web"
token = "jwt_token_xyz789"
save_result = await redisKit.save_user_token(person_id, platform, token, expire=120)
print(f"保存token结果: {save_result}")
# 获取用户token
stored_token = await redisKit.get_user_token(person_id, platform)
print(f"获取token结果: {stored_token}")
# 验证token
validate_result = await redisKit.validate_user_token(person_id, platform, token)
print(f"验证token结果: {validate_result}")
# 验证错误的token
wrong_token = "wrong_token"
wrong_validate_result = await redisKit.validate_user_token(person_id, platform, wrong_token)
print(f"验证错误token结果: {wrong_validate_result}")
# 测试删除操作
print("\n=== 测试删除操作 ===")
# 删除测试数据
delete_result = await redisKit.delete_data(test_key)
print(f"删除数据结果: {delete_result}")
# 删除用户token
delete_token_result = await redisKit.delete_user_token(person_id, platform)
print(f"删除token结果: {delete_token_result}")
# 测试批量token删除功能
print("\n=== 测试批量token删除功能 ===")
# 保存多个平台的token
platforms = ["android", "ios", "web"]
for i, platform in enumerate(platforms):
await redisKit.save_user_token(f"user{i + 1}", platform, f"token{i + 1}", expire=300)
print(f"保存用户{i+1}{platform}平台token")
# 删除特定用户的所有token
user_to_delete = "user2"
batch_delete_result = await redisKit.delete_user_token(user_to_delete)
print(f"批量删除{user_to_delete}的所有token结果: {batch_delete_result}")
# 验证删除结果
for platform in platforms:
token_check = await redisKit.get_user_token(user_to_delete, platform)
print(f"删除后{user_to_delete}-{platform}平台的token状态: {'已删除' if token_check is None else '存在'}")
print("\n=== 所有测试完成 ===")
except Exception as e:
print(f"测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理测试数据并关闭连接池
print("\n清理测试数据并关闭连接池...")
# 清理测试用的数据
test_keys_to_cleanup = ["test_key"]
for key in test_keys_to_cleanup:
await redisKit.delete_data(key)
# 清理测试用户token
cleanup_users = ["user123", "user1", "user2", "user3"]
for user in cleanup_users:
try:
await redisKit.delete_user_token(user)
except:
pass # 忽略清理过程中的错误
# 关闭Redis连接池
await redisKit.close()
print("Redis连接池已关闭")
if __name__ == "__main__":
asyncio.run(main())
# ȫ<><C8AB>ʵ<EFBFBD><CAB5>
redisKit = RedisKit()

View File

@@ -1,30 +1,30 @@
import asyncio
import logging
import sys
from DbKit.Db import Db
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def patch():
# 在Windows上使用SelectorEventLoopPolicy可以减少SSL错误
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 为Db类添加临时的异步上下文管理器支持
def patch_db_context_manager():
# 为Db类添加异步上下文管理器支持
if not hasattr(Db, '__aenter__'):
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 简单的清理,不做实际操作,避免事件循环问题
return False
# 动态添加方法
Db.__aenter__ = __aenter__
Db.__aexit__ = __aexit__
# 应用补丁
patch_db_context_manager()
import asyncio
import logging
import sys
from DbKit.Db import Db
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def patch():
# 在Windows上使用SelectorEventLoopPolicy可以减少SSL错误
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 为Db类添加临时的异步上下文管理器支持
def patch_db_context_manager():
# 为Db类添加异步上下文管理器支持
if not hasattr(Db, '__aenter__'):
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 简单的清理,不做实际操作,避免事件循环问题
return False
# 动态添加方法
Db.__aenter__ = __aenter__
Db.__aexit__ = __aexit__
# 应用补丁
patch_db_context_manager()