125 lines
4.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""scripts/clean_csv.py
Usage:
python3 scripts/clean_csv.py [input_dir] [--out-dir OUT] [--dry-run]
Ce script nettoie les fichiers CSV (trim, suppression de 'NULL', correction du
nombre de colonnes) et réalise des validations simples (colonnes ressemblant
à des UUID, colonnes date).
"""
import csv
import sys
from pathlib import Path
from datetime import datetime
import argparse
import re
UUID_RE = re.compile(r'^[0-9a-fA-F-]{36}$')
def is_uuid(s: str) -> bool:
return bool(UUID_RE.match(s))
def looks_like_date(s: str) -> bool:
if not s:
return False
# try ISO-like detection
try:
datetime.fromisoformat(s)
return True
except Exception:
return False
def clean_csv_file(file_path: Path, out_path: Path = None, dry_run: bool = False):
cleaned_rows = []
errors = []
with file_path.open("r", encoding="utf-8-sig", newline="") as infile:
reader = csv.reader(infile)
try:
header = next(reader)
except StopIteration:
return {'file': str(file_path), 'error': 'empty', 'errors': []}
nb_cols = len(header)
cleaned_rows.append([h.strip() for h in header])
# Heuristiques pour les validations
uuid_cols = [i for i, h in enumerate(header) if h.lower() == 'id' or h.lower().endswith('_id')]
date_cols = [i for i, h in enumerate(header) if any(k in h.lower() for k in ('date', '_le', '_at'))]
for i, row in enumerate(reader, start=2):
if len(row) < nb_cols:
row.extend([""] * (nb_cols - len(row)))
elif len(row) > nb_cols:
row = row[:nb_cols]
# Nettoyage cellule par cellule
row = [cell.strip().replace('NULL', '') for cell in row]
# Validations simples
for ci in uuid_cols:
if ci < len(row) and row[ci]:
if not is_uuid(row[ci]):
errors.append(f"Ligne {i} : colonne {header[ci]} ne ressemble pas à un UUID : {row[ci]!r}")
for ci in date_cols:
if ci < len(row) and row[ci]:
if not looks_like_date(row[ci]):
errors.append(f"Ligne {i} : colonne {header[ci]} n'est pas une date ISO-like : {row[ci]!r}")
cleaned_rows.append(row)
# Écrire la sortie si non --dry-run
if not dry_run:
target = out_path if out_path else file_path
with target.open('w', encoding='utf-8', newline='') as outfile:
writer = csv.writer(outfile)
writer.writerows(cleaned_rows)
return {'file': str(file_path), 'errors': errors}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('input_dir', nargs='?', default='bdd/data_test')
parser.add_argument('--out-dir', '-o', help='Optional output directory for cleaned files')
parser.add_argument('--dry-run', action='store_true', help='Do not write files, just report')
args = parser.parse_args()
base_dir = Path(args.input_dir)
out_dir = Path(args.out_dir) if args.out_dir else None
if out_dir and not out_dir.exists():
out_dir.mkdir(parents=True)
results = []
for file_path in sorted(base_dir.glob('*.csv')):
out_path = out_dir / file_path.name if out_dir else None
res = clean_csv_file(file_path, out_path, args.dry_run)
results.append(res)
# Rapport
any_errors = False
for r in results:
if 'error' in r:
print(f"[AVERT] {r['file']}: {r['error']}")
if r.get('errors'):
any_errors = True
print(f"[ERR] {r['file']} ->")
for e in r['errors'][:20]:
print(' -', e)
else:
print(f"[OK] {r['file']}")
if any_errors:
print('\nCertains fichiers présentent des problèmes de validation. Corrigez-les ou lancez avec --dry-run pour inspecter.')
sys.exit(2)
else:
print('\nTous les fichiers ont été nettoyés avec succès.')
if __name__ == '__main__':
main()