메뉴 여닫기
개인 메뉴 토글
로그인하지 않음
만약 지금 편집한다면 당신의 IP 주소가 공개될 수 있습니다.

파이썬 오라클 커넥션 풀

데브카페

파이썬 오라클 커넥션 풀 확장

  • get_connection_with_retry() 메서드를 클래스 내부로 통합
  • fetch_data() 메서드 추가 (SELECT 결과 반환)
  • 전체 구조 정돈 및 예외처리 강화


oracle_connection_manager.py

import json
import time
import cx_Oracle

class OracleConnectionManager:
    def __init__(self, config_file, max_retries=3, retry_delay=2):
        self.pools = {}
        self.connections = {}
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.load_config(config_file)
        self.create_pools()

    def load_config(self, config_file):
        with open(config_file, 'r') as file:
            self.db_config = json.load(file)["Database"]

    def create_pools(self):
        for db in self.db_config:
            dsn = cx_Oracle.makedsn(db["Host"], db["Port"], service_name=db["ServiceName"])
            pool = cx_Oracle.SessionPool(
                user=db["Username"],
                password=db["Password"],
                dsn=dsn,
                min=1,
                max=5,
                increment=1,
                threaded=True,
                getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
            )
            self.pools[db["Name"]] = pool
            self.connections[db["Name"]] = None

    def get_connection(self, db_name):
        """Get or acquire a new connection with retry logic."""
        if self.connections[db_name] is not None:
            print(f" Using existing connection for {db_name}")
            return self.connections[db_name]

        attempt = 0
        while attempt < self.max_retries:
            try:
                conn = self.pools[db_name].acquire()
                self.connections[db_name] = conn
                print(f" Connection acquired on attempt {attempt + 1} for {db_name}")
                return conn
            except cx_Oracle.DatabaseError as e:
                err, = e.args
                print(f" Attempt {attempt + 1} failed for {db_name}: {err.message}")
                attempt += 1
                time.sleep(self.retry_delay)
        raise Exception(f" Unable to acquire DB connection for {db_name} after {self.max_retries} attempts.")

    def release_connection(self, db_name):
        """Release the connection back to the pool."""
        if self.connections[db_name]:
            self.pools[db_name].release(self.connections[db_name])
            self.connections[db_name] = None
            print(f" Connection released for {db_name}")

    def fetch_data(self, db_name, query, params=None):
        """Execute SELECT query and return results."""
        conn = self.get_connection(db_name)
        cursor = conn.cursor()
        try:
            cursor.execute(query, params or {})
            return cursor.fetchall()
        finally:
            cursor.close()

    def close_all(self):
        """Close all pools and connections."""
        for db_name in self.pools:
            if self.connections[db_name]:
                self.release_connection(db_name)
            self.pools[db_name].close()
            print(f" Pool closed for {db_name}")


사용 예시


if __name__ == "__main__":
    manager = OracleConnectionManager("db_config.json")

    try:
        rows = manager.fetch_data("DB1", "SELECT empno, ename FROM emp WHERE deptno = :dept", {"dept": 10})
        for row in rows:
            print(row)
    finally:
        manager.close_all()

추가 개선 사항

  • execute_update() 메서드 추가 (INSERT, UPDATE 등 트랜잭션 처리 포함)
  • asyncio 기반 비동기 구조 전환
  • 커넥션 상태 모니터링 (예: 자동 재연결 감지)

Comments