Files
auto-archiver-api/src/db/database.py
2025-02-07 17:57:02 +00:00

58 lines
1.7 KiB
Python

from functools import lru_cache
from sqlalchemy import Engine, create_engine, event
from sqlalchemy.orm import sessionmaker
from shared.settings import get_settings
from contextlib import asynccontextmanager, contextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker
@lru_cache
def make_engine(database_url: str):
engine = create_engine(database_url, connect_args={"check_same_thread": False})
@event.listens_for(engine, "connect")
def set_sqlite_pragma(conn, _) -> None:
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.close()
return engine
def make_session_local(engine: Engine):
session_local = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return session_local
@contextmanager
def get_db():
session = make_session_local(make_engine(get_settings().DATABASE_PATH))()
try: yield session
finally: session.close()
def get_db_dependency():
# to use with Depends and ensure proper session closing
with get_db() as db:
yield db
# ASYNC connections
async def make_async_engine(database_url: str) -> AsyncEngine:
engine = create_async_engine(database_url, connect_args={"check_same_thread": False})
return engine
async def make_async_session_local(engine: AsyncEngine) -> AsyncSession:
return async_sessionmaker(engine, expire_on_commit=False, autoflush=False, autocommit=False)
@asynccontextmanager
async def get_db_async():
engine = await make_async_engine(get_settings().ASYNC_DATABASE_PATH)
async_session = await make_async_session_local(engine)
async with async_session() as session:
try: yield session
finally: await engine.dispose()