241 lines
9.1 KiB
Python
241 lines
9.1 KiB
Python
import random
|
|
import string
|
|
import pyodbc
|
|
import uvicorn
|
|
import logging
|
|
import os
|
|
from fastapi import FastAPI, HTTPException, Depends, Query
|
|
from typing import List
|
|
from database import get_connection
|
|
from models import (
|
|
Cliente, SedeConsegna, Articolo, CodiceRicerca,
|
|
MetodoPagamento, DisponibilitaArticolo,
|
|
OrdineTestata, OrdineRiga,Magazzini
|
|
)
|
|
from contants import NOME_PROGETTO, MESSAGGIO_SUCCESSO,API_SECRET_KEY,VALUTA,FL_SCOR,TIPO_RIGA_ART,FL_OMAG
|
|
from datetime import datetime
|
|
|
|
# Log config
|
|
logging.basicConfig(
|
|
filename='api_errors.log',
|
|
level=logging.ERROR, # You can change to DEBUG or INFO as needed
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
def genera_codice_random(lunghezza: int = 50) -> str:
|
|
caratteri = string.ascii_letters + string.digits # a-z A-Z 0-9
|
|
return ''.join(random.choices(caratteri, k=lunghezza))
|
|
|
|
def genera_cpccchk(lunghezza: int = 10) -> str:
|
|
lettere = string.ascii_uppercase # solo lettere maiuscole A-Z
|
|
return ''.join(random.choices(lettere, k=lunghezza))
|
|
|
|
def genera_nuovo_codice_cliente():
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT TOP 1 ANCODICE FROM MANGACONTI WHERE ANCODICE LIKE 'IK%' ORDER BY ANCODICE DESC")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
ultimo_codice = row[0].strip() # es. "IK00000001"
|
|
numero = int(ultimo_codice[2:]) # parte numerica dopo "IK"
|
|
nuovo_numero = numero + 1
|
|
else:
|
|
nuovo_numero = 1
|
|
|
|
nuovo_codice = f"IK{nuovo_numero:08d}" # 8 cifre numeriche con zeri davanti
|
|
nuovo_codice = nuovo_codice.ljust(15) # aggiunge spazi fino a 15 caratteri totali
|
|
return nuovo_codice
|
|
|
|
except pyodbc.Error as e:
|
|
# Puoi loggare o rilanciare l'eccezione, o gestirla come vuoi
|
|
raise Exception(f"Errore DB nel generare codice cliente: {str(e)}")
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def verifica_secret_key(secret: str = Query(..., description="Chiave di accesso")):
|
|
if secret != API_SECRET_KEY:
|
|
raise HTTPException(status_code=403, detail="Accesso negato: chiave non valida")
|
|
|
|
@app.get("/clienti", response_model=List[Cliente])
|
|
def get_clienti(secret: str = Depends(verifica_secret_key)):
|
|
print(NOME_PROGETTO)
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT ANTIPCON, ANCODICE, ANDESCRI, ANPARIVA, ANCODFIS,ANINDIRI FROM MANGACONTI")
|
|
rows = cursor.fetchall()
|
|
return [Cliente(antipcon=r[0],ancodice=r[1], andescri=r[2], anpariva=r[3], ancodfis=r[4],anindiri=r[5]) for r in rows]
|
|
|
|
|
|
@app.get("/sedi-consegna", response_model=List[SedeConsegna])
|
|
def get_sedi_consegna(codice_cliente: str,secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT ddtipcon, ddcodice, ddcoddes,ddnomdes,ddindiri,dd___cap,ddlocali,ddprovin,ddcodnaz FROM MANGADES_DIVE WHERE ddtipcon='C' AND ddcodice = ?", codice_cliente)
|
|
rows = cursor.fetchall()
|
|
return [SedeConsegna(ddtipcon=r[0], ddcodice=r[1], ddcoddes=r[2],ddnomdes=r[3],ddindiri=r[4],dd___cap=r[5],ddlocali=r[6],ddprovin=r[7],ddcodnaz=r[8]) for r in rows]
|
|
|
|
|
|
@app.get("/articoli", response_model=List[Articolo])
|
|
def get_articoli(secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("select ARCODART, ARDESART,ARDESSUP,ARUNMIS1,ARCODIVA from MANGAART_ICOL where ardtobso is null")
|
|
rows = cursor.fetchall()
|
|
return [Articolo(arcodart=r[0], ardesart=r[1], ardessup=r[2], arunmis1=r[3]) for r in rows]
|
|
|
|
|
|
@app.get("/codici-ricerca", response_model=List[CodiceRicerca])
|
|
def get_codici_ricerca(secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("select CACODICE, CADESART,CADESSUP, CACODART from MANGAKEY_ARTI where CADTOBSO is null")
|
|
rows = cursor.fetchall()
|
|
return [CodiceRicerca(cacodice=r[0], cadesart=r[1], cadessup=r[2], cacodart=r[3]) for r in rows]
|
|
|
|
|
|
@app.get("/metodi-pagamento", response_model=List[MetodoPagamento])
|
|
def get_metodi_pagamento(secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("select PACODICE, PADESCRI from MANGAPAG_AMEN where PADTOBSO is null")
|
|
rows = cursor.fetchall()
|
|
return [MetodoPagamento(pacodice=r[0], padescri=r[1]) for r in rows]
|
|
|
|
@app.get("/magazzini", response_model=List[Magazzini])
|
|
def get_magazzini(secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM MANGA_IKTOME_MAGAZ")
|
|
rows = cursor.fetchall()
|
|
return [Magazzini(mgcodmag=r[0], mgdesmag=r[1]) for r in rows]
|
|
|
|
@app.get("/disponibilita", response_model=DisponibilitaArticolo)
|
|
def get_disponibilita(codice_articolo: str,secret: str = Depends(verifica_secret_key)):
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("select SLQTAPER from MANGASALDIART WHERE SLCODART = ?", codice_articolo)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return DisponibilitaArticolo(slcodart=codice_articolo, slqtaper=row[0])
|
|
raise HTTPException(status_code=404, detail="Articolo non trovato")
|
|
|
|
@app.post("/ordini")
|
|
def crea_ordine(ordine: OrdineTestata):
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Se manca codice_cliente e ho dati cliente, creo nuovo cliente
|
|
if not ordine.codice_cliente and ordine.cliente:
|
|
codice_cliente = genera_nuovo_codice_cliente() # metodo che genera codice tipo "IK00000001 "
|
|
cursor.execute("""
|
|
INSERT INTO MANGACONTI (ANTIPCON, ANCODICE, ANDESCRI, ANPARIVA, ANCODFIS, ANINDIRI)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
"C",
|
|
codice_cliente,
|
|
ordine.cliente.andescri,
|
|
ordine.cliente.anpariva,
|
|
ordine.cliente.ancodfis,
|
|
ordine.cliente.anindiri
|
|
))
|
|
conn.commit()
|
|
ordine.codice_cliente = codice_cliente
|
|
|
|
if not ordine.codice_cliente:
|
|
raise HTTPException(status_code=400, detail="Codice cliente mancante e dati cliente non forniti")
|
|
|
|
# Genera codice ordine
|
|
seriale_ordine = genera_codice_random()
|
|
ordine.orserial = seriale_ordine
|
|
cpccchk=genera_cpccchk()
|
|
data_ora_corrente = datetime.now()
|
|
ordine.or_stato=0 #stato iniziale
|
|
|
|
cursor.execute("""
|
|
INSERT INTO MANGAZORDWEBM (orserial, or_stato, ortipdoc, ornumdoc, oralfdoc, ortipcon, orcodcon, orcodage, orcodpag,
|
|
ortotord, ordatdoc,cpccchk,orcodval,orflscor,oralfest,utdc,utdv,orcodpor,or__note,orrifest,orvalnaz,orcaoval,orvalacc)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
ordine.orserial,
|
|
ordine.or_stato,
|
|
ordine.ortipdoc,
|
|
ordine.ornumdoc,
|
|
ordine.oralfdoc,
|
|
ordine.ortipcon,
|
|
ordine.codice_cliente,
|
|
ordine.orcodage,
|
|
ordine.orcodpag,
|
|
ordine.ortotord,
|
|
ordine.ordatdoc,
|
|
cpccchk,
|
|
VALUTA,
|
|
FL_SCOR,
|
|
" ",
|
|
data_ora_corrente,
|
|
data_ora_corrente,
|
|
" ",
|
|
" ",
|
|
" ",
|
|
" ",
|
|
1,
|
|
" "
|
|
))
|
|
#conn.commit()
|
|
cursor.execute("SELECT SCOPE_IDENTITY()")
|
|
id_ordine = cursor.fetchone()[0]
|
|
|
|
# Inserisce righe ordine
|
|
for riga in ordine.righe:
|
|
cproword=riga.cprownum * 10
|
|
cursor.execute("""
|
|
INSERT INTO MANGAZORDWEBD (orserial, cprownum, ordesart, ordessup, orqtamov, orcodart,orcodice,
|
|
cproword,ortiprig,orcodvar,orunimis,orcodiva,orflomag,orcodlis,orcontra,
|
|
orscolis,orprolis,orprosco,cpccchk)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
ordine.orserial,
|
|
riga.cprownum,
|
|
riga.ordesart,
|
|
riga.ordessup,
|
|
riga.orqtamov,
|
|
riga.orcodart,
|
|
riga.orcodice,
|
|
cproword,
|
|
TIPO_RIGA_ART,
|
|
" ",
|
|
riga.orunimis,
|
|
riga.orcodiva,
|
|
FL_OMAG,
|
|
" ",
|
|
" ",
|
|
" ",
|
|
" ",
|
|
" ",
|
|
genera_cpccchk()
|
|
))
|
|
conn.commit()
|
|
|
|
return {"orserial": ordine.orserial, "messaggio": "Ordine creato con successo"}
|
|
|
|
except pyodbc.Error as e:
|
|
conn.rollback()
|
|
logging.error("Errore DB durante la creazione dell'ordine", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Errore DB: {str(e)}")
|
|
|
|
except Exception as e:
|
|
logging.error("Errore generico durante la creazione dell'ordine", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Errore generico: {str(e)}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |