메뉴 여닫기
개인 메뉴 토글
로그인하지 않음
만약 지금 편집한다면 당신의 IP 주소가 공개될 수 있습니다.

Python oracle 커넥션 풀: 두 판 사이의 차이

데브카페
새 문서: 좋습니다! 말씀하신 기능을 모두 포함하는 강화된 버전의 OracleMultiPool 클래스를 아래에 제공합니다. ⸻ ✅ 주요 기능 요약 기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync) ⸻ ✅ 1....
 
편집 요약 없음
 
(같은 사용자의 중간 판 하나는 보이지 않습니다)
1번째 줄: 1번째 줄:
좋습니다! 말씀하신 기능을 모두 포함하는 강화된 버전의 OracleMultiPool 클래스를 아래에 제공합니다.
 


19번째 줄: 19번째 줄:
✅ 1. 설정 파일: db_config.json
✅ 1. 설정 파일: db_config.json


<source lang=js>
{
{
   "DB1": {
   "DB1": {
31번째 줄: 32번째 줄:
   }
   }
}
}
</source>




39번째 줄: 41번째 줄:


동기 + 비동기 + 자동 커밋 + 재시도 지원
동기 + 비동기 + 자동 커밋 + 재시도 지원
 
<source lang=python>
# oracle_pool.py
# oracle_pool.py


146번째 줄: 148번째 줄:
         await loop.run_in_executor(None, self.sync_pool.close_all)
         await loop.run_in_executor(None, self.sync_pool.close_all)


</source>




153번째 줄: 156번째 줄:


동기 버전 (main.py)
동기 버전 (main.py)
 
<source lang=python>
from oracle_pool import OracleMultiPool
from oracle_pool import OracleMultiPool


166번째 줄: 169번째 줄:
db.close_all()
db.close_all()


</source>




171번째 줄: 175번째 줄:


비동기 버전 (main_async.py)
비동기 버전 (main_async.py)
 
<source lang=python>
import asyncio
import asyncio
from oracle_pool import OracleMultiPool, OracleMultiPoolAsync
from oracle_pool import OracleMultiPool, OracleMultiPoolAsync
187번째 줄: 191번째 줄:


asyncio.run(main())
asyncio.run(main())
</source>
[[category:python]]

2025년 5월 21일 (수) 06:34 기준 최신판


✅ 주요 기능 요약

기능 지원 여부 외부 JSON 설정 파일 O (db_config.json) 여러 DB 커넥션 풀 관리 O (key로 구분) 쿼리 실행 후 자동 커넥션 반납 O INSERT/UPDATE 자동 커밋 O (autocommit=True) 재시도 로직 (Pool 생성 실패 시) O 비동기 지원 (asyncio) O (OracleMultiPoolAsync)


✅ 1. 설정 파일: db_config.json

{
  "DB1": {
    "user": "user1",
    "password": "pass1",
    "dsn": "host1:1521/service1"
  },
  "DB2": {
    "user": "user2",
    "password": "pass2",
    "dsn": "host2:1521/service2"
  }
}


✅ 2. 모듈: oracle_pool.py

동기 + 비동기 + 자동 커밋 + 재시도 지원

# oracle_pool.py

import cx_Oracle
import json
import os
import asyncio

class OracleMultiPool:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, config_path='db_config.json', min=2, max=5, increment=1, retry=3):
        if hasattr(self, "_initialized") and self._initialized:
            return

        self.pools = {}
        self._initialized = False

        if not os.path.exists(config_path):
            print(f"[Error] DB config file '{config_path}' not found.")
            return

        try:
            with open(config_path, 'r') as f:
                config = json.load(f)
        except Exception as e:
            print(f"[Error] Failed to load DB config: {e}")
            return

        for db_key, db_info in config.items():
            for attempt in range(retry):
                try:
                    pool = cx_Oracle.SessionPool(
                        user=db_info["user"],
                        password=db_info["password"],
                        dsn=db_info["dsn"],
                        min=min,
                        max=max,
                        increment=increment,
                        encoding="UTF-8"
                    )
                    self.pools[db_key] = pool
                    print(f"[Connected] Pool created for {db_key}")
                    break
                except cx_Oracle.Error as e:
                    print(f"[Retry {attempt+1}] Failed to connect {db_key}: {e}")
                    if attempt == retry - 1:
                        print(f"[Error] Giving up on {db_key}")

        self._initialized = True

    def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
        if db_key not in self.pools:
            print(f"[Error] No pool for '{db_key}'")
            return None

        conn = None
        try:
            conn = self.pools[db_key].acquire()
            cursor = conn.cursor()
            cursor.execute(query, params or {})

            if autocommit:
                conn.commit()

            result = cursor.fetchall() if fetch else None
            cursor.close()
            return result
        except cx_Oracle.Error as e:
            print(f"[{db_key}] Query error: {e}")
            return None
        finally:
            if conn:
                self.pools[db_key].release(conn)

    def close_all(self):
        for db_key, pool in self.pools.items():
            pool.close()
            print(f"[Closed] Pool for {db_key}")


# 선택사항: 비동기 버전 (asyncio)
class OracleMultiPoolAsync:
    def __init__(self, sync_pool: OracleMultiPool):
        self.sync_pool = sync_pool

    async def execute(self, db_key, query, params=None, fetch=True, autocommit=False):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None,
            self.sync_pool.execute,
            db_key,
            query,
            params,
            fetch,
            autocommit
        )

    async def close_all(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.sync_pool.close_all)


✅ 3. 사용 예제

동기 버전 (main.py)

from oracle_pool import OracleMultiPool

db = OracleMultiPool()

result = db.execute("DB1", "SELECT SYSDATE FROM dual")
print("DB1:", result)

# 자동 커밋 예 (INSERT/UPDATE)
db.execute("DB2", "UPDATE users SET active = 1 WHERE id = :id", params={"id": 5}, fetch=False, autocommit=True)

db.close_all()


비동기 버전 (main_async.py)

import asyncio
from oracle_pool import OracleMultiPool, OracleMultiPoolAsync

async def main():
    sync_db = OracleMultiPool()
    async_db = OracleMultiPoolAsync(sync_db)

    result = await async_db.execute("DB1", "SELECT SYSDATE FROM dual")
    print("Async DB1:", result)

    await async_db.execute("DB2", "UPDATE users SET active = 0 WHERE id = :id", params={"id": 1}, fetch=False, autocommit=True)

    await async_db.close_all()

asyncio.run(main())

Comments