61 lines
1.8 KiB
Python
61 lines
1.8 KiB
Python
import pymysql
|
|
import config
|
|
import atexit
|
|
|
|
|
|
class DatabaseConnection:
|
|
_instance = None
|
|
_connection = None
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if not cls._instance:
|
|
cls._instance = super(DatabaseConnection, cls).__new__(cls, *args, **kwargs)
|
|
# 在这里初始化连接(注意:这里应该使用更安全的配置管理方式)
|
|
cls._connection = pymysql.connect(
|
|
host=config.DB_HOST,
|
|
user=config.DB_USER,
|
|
port=config.DB_PORT,
|
|
password=config.DB_PASSWORD,
|
|
database=config.DB_NAME
|
|
)
|
|
return cls._instance
|
|
|
|
def get_connection(self):
|
|
return self._connection
|
|
|
|
@classmethod
|
|
def close_connection(cls):
|
|
if cls._connection:
|
|
cls._connection.close()
|
|
cls._connection = None
|
|
|
|
@classmethod
|
|
def execute_query(cls, query, params=None):
|
|
"""执行查询并返回结果"""
|
|
try:
|
|
with cls._connection.cursor() as cursor:
|
|
cursor.execute(query, params)
|
|
result = cursor.fetchall()
|
|
return result
|
|
except pymysql.MySQLError as e:
|
|
print(f"Error: {e}")
|
|
finally:
|
|
pass
|
|
|
|
@classmethod
|
|
def execute_commit(cls, query, params=None):
|
|
"""执行查询并返回结果"""
|
|
try:
|
|
with cls._connection.cursor() as cursor:
|
|
cursor.execute(query, params)
|
|
cls._connection.commit()
|
|
except pymysql.MySQLError as e:
|
|
print(f"Error: {e}")
|
|
cls._connection.rollback()
|
|
finally:
|
|
pass
|
|
|
|
|
|
# 注册关闭连接的函数,以便在脚本退出时自动调用
|
|
atexit.register(DatabaseConnection.close_connection)
|