다른 명령
잔글 Devcafe님이 Python oracle connetion pool 문서를 Python oracle connection pool 문서로 이동했습니다 |
편집 요약 없음 |
||
| (같은 사용자의 중간 판 하나는 보이지 않습니다) | |||
| 1번째 줄: | 1번째 줄: | ||
요구사항 요약 | 요구사항 요약 | ||
| 153번째 줄: | 149번째 줄: | ||
• 커넥션 풀 재시도 로직 | • 커넥션 풀 재시도 로직 | ||
• 비동기(asyncio) 구조로 전환 | • 비동기(asyncio) 구조로 전환 | ||
[[category:python]] | |||
[[category:oracle]] | |||
2025년 6월 24일 (화) 12:28 기준 최신판
요구사항 요약 1. DB 접속 정보는 외부 파일에서 읽음 (예: db_config.json) 2. 여러 개의 DB에 각각 Connection Pool을 생성하여 관리 3. 각 DB를 식별자(db_key)로 구분해 사용 4. import 시 자동으로 모든 DB 접속 준비 5. execute(db_key, query)로 쿼리 실행 → 자동 커넥션 반납
⸻
✅ 1. 외부 설정 파일 (예: db_config.json)
{
"DB1": {
"user": "user1",
"password": "pass1",
"dsn": "host1:1521/service1"
},
"DB2": {
"user": "user2",
"password": "pass2",
"dsn": "host2:1521/service2"
}
}
⸻
✅ 2. 모듈 코드: oracle_pool.py
- oracle_pool.py
import cx_Oracle import json import os
class OracleMultiPool:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(OracleMultiPool, cls).__new__(cls)
return cls._instance
def __init__(self, config_path='db_config.json', min=2, max=5, increment=1):
if hasattr(self, "_initialized") and self._initialized:
return
self.pools = {}
self._initialized = False
# JSON 설정 읽기
if not os.path.exists(config_path):
print(f"[Error] DB config file '{config_path}' not found.")
return
try:
with open(config_path, 'r') as f:
config = json.load(f)
except Exception as e:
print(f"[Error] Failed to load DB config: {e}")
return
# 각 DB에 대해 커넥션 풀 생성
for db_key, db_info in config.items():
try:
pool = cx_Oracle.SessionPool(
user=db_info["user"],
password=db_info["password"],
dsn=db_info["dsn"],
min=min,
max=max,
increment=increment,
encoding="UTF-8"
)
self.pools[db_key] = pool
print(f"[Connected] Pool created for {db_key}")
except cx_Oracle.Error as e:
print(f"[Error] Failed to connect {db_key}: {e}")
self._initialized = True
def execute(self, db_key, query, params=None, fetch=True):
if db_key not in self.pools:
print(f"[Error] No connection pool for '{db_key}'")
return None
conn = None
try:
conn = self.pools[db_key].acquire()
cursor = conn.cursor()
cursor.execute(query, params or {})
result = cursor.fetchall() if fetch else None
cursor.close()
return result
except cx_Oracle.Error as e:
print(f"[{db_key}] Query error: {e}")
return None
finally:
if conn:
self.pools[db_key].release(conn)
def close_all(self):
for db_key, pool in self.pools.items():
pool.close()
print(f"[Closed] Pool for {db_key}")
⸻
✅ 3. 사용 예: main.py
from oracle_pool import OracleMultiPool
- 클래스 인스턴스 생성 시 모든 DB에 접속
db = OracleMultiPool(config_path='db_config.json')
- DB1에 쿼리 실행
result = db.execute("DB1", "SELECT SYSDATE FROM dual") print("[DB1] Result:", result)
- DB2에 쿼리 실행
result = db.execute("DB2", "SELECT USER FROM dual") print("[DB2] Result:", result)
- 모든 커넥션 풀 종료
db.close_all()
⸻
✅ 확장성 & 특징 요약
항목 설명 DB 추가 db_config.json에 항목만 추가하면 자동 반영 쿼리 실행 db.execute("DB1", query) 식으로 간단히 사용 재사용 싱글톤 구조로 모든 곳에서 동일 인스턴스 사용 커넥션 자동 관리 acquire → execute → release 자동 처리 설정 분리 민감한 정보는 코드 외부 JSON으로 분리
⸻
필요하면 다음도 지원 가능합니다: • INSERT/UPDATE 트랜잭션 (commit 자동/수동) • 커넥션 풀 재시도 로직 • 비동기(asyncio) 구조로 전환