#!/usr/bin/env python3 """scripts/clean_csv.py Usage: python3 scripts/clean_csv.py [input_dir] [--out-dir OUT] [--dry-run] Ce script nettoie les fichiers CSV (trim, suppression de 'NULL', correction du nombre de colonnes) et réalise des validations simples (colonnes ressemblant à des UUID, colonnes date). """ import csv import sys from pathlib import Path from datetime import datetime import argparse import re UUID_RE = re.compile(r'^[0-9a-fA-F-]{36}$') def is_uuid(s: str) -> bool: return bool(UUID_RE.match(s)) def looks_like_date(s: str) -> bool: if not s: return False # try ISO-like detection try: datetime.fromisoformat(s) return True except Exception: return False def clean_csv_file(file_path: Path, out_path: Path = None, dry_run: bool = False): cleaned_rows = [] errors = [] with file_path.open("r", encoding="utf-8-sig", newline="") as infile: reader = csv.reader(infile) try: header = next(reader) except StopIteration: return {'file': str(file_path), 'error': 'empty', 'errors': []} nb_cols = len(header) cleaned_rows.append([h.strip() for h in header]) # Heuristiques pour les validations uuid_cols = [i for i, h in enumerate(header) if h.lower() == 'id' or h.lower().endswith('_id')] date_cols = [i for i, h in enumerate(header) if any(k in h.lower() for k in ('date', '_le', '_at'))] for i, row in enumerate(reader, start=2): if len(row) < nb_cols: row.extend([""] * (nb_cols - len(row))) elif len(row) > nb_cols: row = row[:nb_cols] # Nettoyage cellule par cellule row = [cell.strip().replace('NULL', '') for cell in row] # Validations simples for ci in uuid_cols: if ci < len(row) and row[ci]: if not is_uuid(row[ci]): errors.append(f"Ligne {i} : colonne {header[ci]} ne ressemble pas à un UUID : {row[ci]!r}") for ci in date_cols: if ci < len(row) and row[ci]: if not looks_like_date(row[ci]): errors.append(f"Ligne {i} : colonne {header[ci]} n'est pas une date ISO-like : {row[ci]!r}") cleaned_rows.append(row) # Écrire la sortie si non --dry-run if not dry_run: target = out_path if out_path else file_path with target.open('w', encoding='utf-8', newline='') as outfile: writer = csv.writer(outfile) writer.writerows(cleaned_rows) return {'file': str(file_path), 'errors': errors} def main(): parser = argparse.ArgumentParser() parser.add_argument('input_dir', nargs='?', default='bdd/data_test') parser.add_argument('--out-dir', '-o', help='Optional output directory for cleaned files') parser.add_argument('--dry-run', action='store_true', help='Do not write files, just report') args = parser.parse_args() base_dir = Path(args.input_dir) out_dir = Path(args.out_dir) if args.out_dir else None if out_dir and not out_dir.exists(): out_dir.mkdir(parents=True) results = [] for file_path in sorted(base_dir.glob('*.csv')): out_path = out_dir / file_path.name if out_dir else None res = clean_csv_file(file_path, out_path, args.dry_run) results.append(res) # Rapport any_errors = False for r in results: if 'error' in r: print(f"[AVERT] {r['file']}: {r['error']}") if r.get('errors'): any_errors = True print(f"[ERR] {r['file']} ->") for e in r['errors'][:20]: print(' -', e) else: print(f"[OK] {r['file']}") if any_errors: print('\nCertains fichiers présentent des problèmes de validation. Corrigez-les ou lancez avec --dry-run pour inspecter.') sys.exit(2) else: print('\nTous les fichiers ont été nettoyés avec succès.') if __name__ == '__main__': main()