다른 명령
새 문서: 좋습니다! 말씀하신 기능을 모두 포함하는 강화된 버전의 OracleMultiPool 클래스를 아래에 제공합니다. ⸻ ✅ 주요 기능 요약 기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync) ⸻ ✅ 1.... |
편집 요약 없음 |
||
| (같은 사용자의 중간 판 하나는 보이지 않습니다) | |||
| 1번째 줄: | 1번째 줄: | ||
⸻ | ⸻ | ||
| 19번째 줄: | 19번째 줄: | ||
✅ 1. 설정 파일: db_config.json | ✅ 1. 설정 파일: db_config.json | ||
<source lang=js> | |||
{ | { | ||
"DB1": { | "DB1": { | ||
| 31번째 줄: | 32번째 줄: | ||
} | } | ||
} | } | ||
</source> | |||
| 39번째 줄: | 41번째 줄: | ||
동기 + 비동기 + 자동 커밋 + 재시도 지원 | 동기 + 비동기 + 자동 커밋 + 재시도 지원 | ||
<source lang=python> | |||
# oracle_pool.py | # oracle_pool.py | ||
| 146번째 줄: | 148번째 줄: | ||
await loop.run_in_executor(None, self.sync_pool.close_all) | await loop.run_in_executor(None, self.sync_pool.close_all) | ||
</source> | |||
| 153번째 줄: | 156번째 줄: | ||
동기 버전 (main.py) | 동기 버전 (main.py) | ||
<source lang=python> | |||
from oracle_pool import OracleMultiPool | from oracle_pool import OracleMultiPool | ||
| 166번째 줄: | 169번째 줄: | ||
db.close_all() | db.close_all() | ||
</source> | |||
| 171번째 줄: | 175번째 줄: | ||
비동기 버전 (main_async.py) | 비동기 버전 (main_async.py) | ||
<source lang=python> | |||
import asyncio | import asyncio | ||
from oracle_pool import OracleMultiPool, OracleMultiPoolAsync | from oracle_pool import OracleMultiPool, OracleMultiPoolAsync | ||
| 187번째 줄: | 191번째 줄: | ||
asyncio.run(main()) | asyncio.run(main()) | ||
</source> | |||
[[category:python]] | |||
2025년 5월 21일 (수) 06:34 기준 최신판
⸻
✅ 주요 기능 요약
기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync)
⸻
✅ 1. 설정 파일: db_config.json
{
"DB1": {
"user": "user1",
"password": "pass1",
"dsn": "host1:1521/service1"
},
"DB2": {
"user": "user2",
"password": "pass2",
"dsn": "host2:1521/service2"
}
}
⸻
✅ 2. 모듈: oracle_pool.py
동기 + 비동기 + 자동 커밋 + 재시도 지원
# oracle_pool.py
import cx_Oracle
import json
import os
import asyncio
class OracleMultiPool:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, config_path='db_config.json', min=2, max=5, increment=1, retry=3):
if hasattr(self, "_initialized") and self._initialized:
return
self.pools = {}
self._initialized = False
if not os.path.exists(config_path):
print(f"[Error] DB config file '{config_path}' not found.")
return
try:
with open(config_path, 'r') as f:
config = json.load(f)
except Exception as e:
print(f"[Error] Failed to load DB config: {e}")
return
for db_key, db_info in config.items():
for attempt in range(retry):
try:
pool = cx_Oracle.SessionPool(
user=db_info["user"],
password=db_info["password"],
dsn=db_info["dsn"],
min=min,
max=max,
increment=increment,
encoding="UTF-8"
)
self.pools[db_key] = pool
print(f"[Connected] Pool created for {db_key}")
break
except cx_Oracle.Error as e:
print(f"[Retry {attempt+1}] Failed to connect {db_key}: {e}")
if attempt == retry - 1:
print(f"[Error] Giving up on {db_key}")
self._initialized = True
def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
if db_key not in self.pools:
print(f"[Error] No pool for '{db_key}'")
return None
conn = None
try:
conn = self.pools[db_key].acquire()
cursor = conn.cursor()
cursor.execute(query, params or {})
if autocommit:
conn.commit()
result = cursor.fetchall() if fetch else None
cursor.close()
return result
except cx_Oracle.Error as e:
print(f"[{db_key}] Query error: {e}")
return None
finally:
if conn:
self.pools[db_key].release(conn)
def close_all(self):
for db_key, pool in self.pools.items():
pool.close()
print(f"[Closed] Pool for {db_key}")
# 선택사항: 비동기 버전 (asyncio)
class OracleMultiPoolAsync:
def __init__(self, sync_pool: OracleMultiPool):
self.sync_pool = sync_pool
async def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.sync_pool.execute,
db_key,
query,
params,
fetch,
autocommit
)
async def close_all(self):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.sync_pool.close_all)
⸻
✅ 3. 사용 예제
동기 버전 (main.py)
from oracle_pool import OracleMultiPool
db = OracleMultiPool()
result = db.execute("DB1", "SELECT SYSDATE FROM dual")
print("DB1:", result)
# 자동 커밋 예 (INSERT/UPDATE)
db.execute("DB2", "UPDATE users SET active = 1 WHERE id = :id", params={"id": 5}, fetch=False, autocommit=True)
db.close_all()
⸻
비동기 버전 (main_async.py)
import asyncio
from oracle_pool import OracleMultiPool, OracleMultiPoolAsync
async def main():
sync_db = OracleMultiPool()
async_db = OracleMultiPoolAsync(sync_db)
result = await async_db.execute("DB1", "SELECT SYSDATE FROM dual")
print("Async DB1:", result)
await async_db.execute("DB2", "UPDATE users SET active = 0 WHERE id = :id", params={"id": 1}, fetch=False, autocommit=True)
await async_db.close_all()
asyncio.run(main())