메뉴 여닫기
개인 메뉴 토글
로그인하지 않음
만약 지금 편집한다면 당신의 IP 주소가 공개될 수 있습니다.

Python oracle 커넥션 풀

데브카페
Devcafe (토론 | 기여)님의 2025년 5월 21일 (수) 06:32 판 (새 문서: 좋습니다! 말씀하신 기능을 모두 포함하는 강화된 버전의 OracleMultiPool 클래스를 아래에 제공합니다. ⸻ ✅ 주요 기능 요약 기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync) ⸻ ✅ 1....)
(차이) ← 이전 판 | 최신판 (차이) | 다음 판 → (차이)

좋습니다! 말씀하신 기능을 모두 포함하는 강화된 버전의 OracleMultiPool 클래스를 아래에 제공합니다.

✅ 주요 기능 요약

기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync)


✅ 1. 설정 파일: db_config.json

{

 "DB1": {
   "user": "user1",
   "password": "pass1",
   "dsn": "host1:1521/service1"
 },
 "DB2": {
   "user": "user2",
   "password": "pass2",
   "dsn": "host2:1521/service2"
 }

}


✅ 2. 모듈: oracle_pool.py

동기 + 비동기 + 자동 커밋 + 재시도 지원

  1. oracle_pool.py

import cx_Oracle import json import os import asyncio

class OracleMultiPool:

   _instance = None
   def __new__(cls, *args, **kwargs):
       if not cls._instance:
           cls._instance = super().__new__(cls)
       return cls._instance
   def __init__(self, config_path='db_config.json', min=2, max=5, increment=1, retry=3):
       if hasattr(self, "_initialized") and self._initialized:
           return
       self.pools = {}
       self._initialized = False
       if not os.path.exists(config_path):
           print(f"[Error] DB config file '{config_path}' not found.")
           return
       try:
           with open(config_path, 'r') as f:
               config = json.load(f)
       except Exception as e:
           print(f"[Error] Failed to load DB config: {e}")
           return
       for db_key, db_info in config.items():
           for attempt in range(retry):
               try:
                   pool = cx_Oracle.SessionPool(
                       user=db_info["user"],
                       password=db_info["password"],
                       dsn=db_info["dsn"],
                       min=min,
                       max=max,
                       increment=increment,
                       encoding="UTF-8"
                   )
                   self.pools[db_key] = pool
                   print(f"[Connected] Pool created for {db_key}")
                   break
               except cx_Oracle.Error as e:
                   print(f"[Retry {attempt+1}] Failed to connect {db_key}: {e}")
                   if attempt == retry - 1:
                       print(f"[Error] Giving up on {db_key}")
       self._initialized = True
   def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
       if db_key not in self.pools:
           print(f"[Error] No pool for '{db_key}'")
           return None
       conn = None
       try:
           conn = self.pools[db_key].acquire()
           cursor = conn.cursor()
           cursor.execute(query, params or {})
           if autocommit:
               conn.commit()
           result = cursor.fetchall() if fetch else None
           cursor.close()
           return result
       except cx_Oracle.Error as e:
           print(f"[{db_key}] Query error: {e}")
           return None
       finally:
           if conn:
               self.pools[db_key].release(conn)
   def close_all(self):
       for db_key, pool in self.pools.items():
           pool.close()
           print(f"[Closed] Pool for {db_key}")


  1. 선택사항: 비동기 버전 (asyncio)

class OracleMultiPoolAsync:

   def __init__(self, sync_pool: OracleMultiPool):
       self.sync_pool = sync_pool
   async def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
       loop = asyncio.get_event_loop()
       return await loop.run_in_executor(
           None,
           self.sync_pool.execute,
           db_key,
           query,
           params,
           fetch,
           autocommit
       )
   async def close_all(self):
       loop = asyncio.get_event_loop()
       await loop.run_in_executor(None, self.sync_pool.close_all)


✅ 3. 사용 예제

동기 버전 (main.py)

from oracle_pool import OracleMultiPool

db = OracleMultiPool()

result = db.execute("DB1", "SELECT SYSDATE FROM dual") print("DB1:", result)

  1. 자동 커밋 예 (INSERT/UPDATE)

db.execute("DB2", "UPDATE users SET active = 1 WHERE id = :id", params={"id": 5}, fetch=False, autocommit=True)

db.close_all()


비동기 버전 (main_async.py)

import asyncio from oracle_pool import OracleMultiPool, OracleMultiPoolAsync

async def main():

   sync_db = OracleMultiPool()
   async_db = OracleMultiPoolAsync(sync_db)
   result = await async_db.execute("DB1", "SELECT SYSDATE FROM dual")
   print("Async DB1:", result)
   await async_db.execute("DB2", "UPDATE users SET active = 0 WHERE id = :id", params={"id": 1}, fetch=False, autocommit=True)
   await async_db.close_all()

asyncio.run(main())

Comments