montevibiano-industry-coditech/app/db.py
2025-09-26 15:29:31 +02:00

79 lines
2.7 KiB
Python

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from urllib.parse import quote_plus
def build_odbc_url(
server: str,
database: str,
username: str | None,
password: str | None,
encrypt: bool = False,
trust_server_certificate: bool = True,
timeout: int = 15,
) -> str:
"""
Build SQLAlchemy ODBC URL for SQL Server using ODBC Driver 18.
Default Encrypt=no to support servers with encryption disabled.
If encrypt=True, we set Encrypt=yes and TrustServerCertificate accordingly.
"""
driver = "ODBC Driver 17 for SQL Server"
odbc_params = {
"DRIVER": driver,
"SERVER": server,
"DATABASE": database,
"Connection Timeout": str(timeout),
}
if encrypt:
odbc_params["Encrypt"] = "yes"
odbc_params["TrustServerCertificate"] = "yes" if trust_server_certificate else "no"
else:
# critical for servers with encryption disabled
odbc_params["Encrypt"] = "no"
if username and password:
odbc_params["UID"] = username
odbc_params["PWD"] = password
else:
# Trusted_Connection can work on Windows/domain; usually not in Linux containers.
odbc_params["Trusted_Connection"] = "yes"
conn_str = ";".join([f"{k}={v}" for k, v in odbc_params.items()])
return f"mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}"
def get_engine(payload) -> Engine:
url = build_odbc_url(
server=payload.server,
database=payload.database,
username=getattr(payload, "username", None),
password=getattr(payload, "password", None),
encrypt=getattr(payload, "encrypt", False),
trust_server_certificate=getattr(payload, "trust_server_certificate", True),
timeout=getattr(payload, "timeout", 15),
)
return create_engine(
url,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
fast_executemany=True,
)
async def run_query(engine: Engine, sql: str, params: dict, max_rows: int | None = None):
with engine.connect() as conn:
result = conn.execute(text(sql), params)
if result.returns_rows:
rows = result.fetchmany(max_rows) if max_rows else result.fetchall()
cols = result.keys()
data = [dict(zip(cols, row)) for row in rows]
return {"columns": list(cols), "rows": data}
return {"columns": [], "rows": []}
async def run_execute(engine: Engine, sql: str, params: dict, autocommit: bool):
with (engine.begin() if not autocommit else engine.connect()) as conn:
result = conn.execute(text(sql), params)
if autocommit:
conn.commit()
return {"rowcount": result.rowcount}