다른 명령
⸻
✅ 주요 기능 요약
기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync)
⸻
✅ 1. 설정 파일: db_config.json
{ "DB1": { "user": "user1", "password": "pass1", "dsn": "host1:1521/service1" }, "DB2": { "user": "user2", "password": "pass2", "dsn": "host2:1521/service2" } }
⸻
✅ 2. 모듈: oracle_pool.py
동기 + 비동기 + 자동 커밋 + 재시도 지원
# oracle_pool.py import cx_Oracle import json import os import asyncio class OracleMultiPool: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self, config_path='db_config.json', min=2, max=5, increment=1, retry=3): if hasattr(self, "_initialized") and self._initialized: return self.pools = {} self._initialized = False if not os.path.exists(config_path): print(f"[Error] DB config file '{config_path}' not found.") return try: with open(config_path, 'r') as f: config = json.load(f) except Exception as e: print(f"[Error] Failed to load DB config: {e}") return for db_key, db_info in config.items(): for attempt in range(retry): try: pool = cx_Oracle.SessionPool( user=db_info["user"], password=db_info["password"], dsn=db_info["dsn"], min=min, max=max, increment=increment, encoding="UTF-8" ) self.pools[db_key] = pool print(f"[Connected] Pool created for {db_key}") break except cx_Oracle.Error as e: print(f"[Retry {attempt+1}] Failed to connect {db_key}: {e}") if attempt == retry - 1: print(f"[Error] Giving up on {db_key}") self._initialized = True def execute(self, db_key, query, params=None, fetch=True, autocommit=False): if db_key not in self.pools: print(f"[Error] No pool for '{db_key}'") return None conn = None try: conn = self.pools[db_key].acquire() cursor = conn.cursor() cursor.execute(query, params or {}) if autocommit: conn.commit() result = cursor.fetchall() if fetch else None cursor.close() return result except cx_Oracle.Error as e: print(f"[{db_key}] Query error: {e}") return None finally: if conn: self.pools[db_key].release(conn) def close_all(self): for db_key, pool in self.pools.items(): pool.close() print(f"[Closed] Pool for {db_key}") # 선택사항: 비동기 버전 (asyncio) class OracleMultiPoolAsync: def __init__(self, sync_pool: OracleMultiPool): self.sync_pool = sync_pool async def execute(self, db_key, query, params=None, fetch=True, autocommit=False): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self.sync_pool.execute, db_key, query, params, fetch, autocommit ) async def close_all(self): loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.sync_pool.close_all)
⸻
✅ 3. 사용 예제
동기 버전 (main.py)
from oracle_pool import OracleMultiPool db = OracleMultiPool() result = db.execute("DB1", "SELECT SYSDATE FROM dual") print("DB1:", result) # 자동 커밋 예 (INSERT/UPDATE) db.execute("DB2", "UPDATE users SET active = 1 WHERE id = :id", params={"id": 5}, fetch=False, autocommit=True) db.close_all()
⸻
비동기 버전 (main_async.py)
import asyncio from oracle_pool import OracleMultiPool, OracleMultiPoolAsync async def main(): sync_db = OracleMultiPool() async_db = OracleMultiPoolAsync(sync_db) result = await async_db.execute("DB1", "SELECT SYSDATE FROM dual") print("Async DB1:", result) await async_db.execute("DB2", "UPDATE users SET active = 0 WHERE id = :id", params={"id": 1}, fetch=False, autocommit=True) await async_db.close_all() asyncio.run(main())