import pymysql import config import atexit class DatabaseConnection: _instance = None _connection = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(DatabaseConnection, cls).__new__(cls, *args, **kwargs) # 在这里初始化连接(注意:这里应该使用更安全的配置管理方式) cls._connection = pymysql.connect( host=config.DB_HOST, user=config.DB_USER, port=config.DB_PORT, password=config.DB_PASSWORD, database=config.DB_NAME ) return cls._instance def get_connection(self): return self._connection @classmethod def close_connection(cls): if cls._connection: cls._connection.close() cls._connection = None @classmethod def execute_query(cls, query, params=None): """执行查询并返回结果""" try: with cls._connection.cursor() as cursor: cursor.execute(query, params) result = cursor.fetchall() return result except pymysql.MySQLError as e: print(f"Error: {e}") finally: pass @classmethod def execute_commit(cls, query, params=None): """执行查询并返回结果""" try: with cls._connection.cursor() as cursor: cursor.execute(query, params) cls._connection.commit() except pymysql.MySQLError as e: print(f"Error: {e}") cls._connection.rollback() finally: pass # 注册关闭连接的函数,以便在脚本退出时自动调用 atexit.register(DatabaseConnection.close_connection)