Source code for ace.tools.xml.utils

import xml.etree.ElementTree as ET
import csv
import os
from pathlib import Path
from typing import Dict, List
import logging
from collections import defaultdict

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


[docs] class XMLExtractor:
[docs] def extract_element_data(self, element, parent_path=""): """ Recursively extract data from XML elements including tc attributes. """ results = [] # Build current path tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag current_path = f"{parent_path}/{tag}" if parent_path else tag # Extract current element data element_data = { 'path': current_path, 'text': element.text.strip() if element.text and element.text.strip() else None, 'tc_value': element.get('tc') } # Only add if there's meaningful content (text or tc attribute) if element_data['text'] or element_data['tc_value']: results.append(element_data) # Process child elements for child in element: results.extend(self.extract_element_data(child, current_path)) return results
[docs] def extract_from_file(self, file_path: str) -> List[Dict]: """ Extract all data from a single XML file. Returns list of dictionaries with path, values, tc. """ try: tree = ET.parse(file_path) root = tree.getroot() # Extract all data elements = self.extract_element_data(root) # Format for CSV output csv_rows = [] for element in elements: row = { 'path': element['path'], 'values': element['text'] or '', 'tc': element['tc_value'] or '' } csv_rows.append(row) return csv_rows except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") return []
[docs] def read_existing_csv(self, csv_file: str) -> Dict[str, Dict]: """ Read existing CSV and return a dictionary with path as key. Values and tc are stored as sets to handle multiple occurrences without duplicates. """ existing_data = {} if not os.path.exists(csv_file): return existing_data try: with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: path = row['path'] values = row['values'] tc = row['tc'] # Parse existing lists (format: [val1,val2,val3]) if values.startswith('[') and values.endswith(']'): values_set = set(v.strip() for v in values[1:-1].split(',') if v.strip()) else: values_set = {values} if values else set() if tc.startswith('[') and tc.endswith(']'): tc_set = set(t.strip() for t in tc[1:-1].split(',') if t.strip()) else: tc_set = {tc} if tc else set() existing_data[path] = { 'values': values_set, 'tc': tc_set } except Exception as e: logger.error(f"Error reading existing CSV: {str(e)}") return existing_data
[docs] def merge_data(self, existing_data: Dict, new_rows: List[Dict]) -> List[Dict]: """ Merge new data with existing data using sets to avoid duplicates. """ # Create a working copy of existing data using sets merged_data = defaultdict(lambda: {'values': set(), 'tc': set()}) # Copy existing data for path, data in existing_data.items(): merged_data[path]['values'] = data['values'].copy() merged_data[path]['tc'] = data['tc'].copy() # Add new data (sets automatically handle duplicates) for row in new_rows: path = row['path'] value = row['values'] tc = row['tc'] # Add to sets if not empty if value: merged_data[path]['values'].add(value) if tc: merged_data[path]['tc'].add(tc) # Convert back to list format for CSV result = [] for path, data in merged_data.items(): # Convert sets to sorted lists for consistent output values_list = sorted(list(data['values'])) tc_list = sorted(list(data['tc'])) # Format as lists if multiple values if len(values_list) > 1: values_str = '[' + ','.join(values_list) + ']' elif len(values_list) == 1: values_str = values_list[0] else: values_str = '' if len(tc_list) > 1: tc_str = '[' + ','.join(tc_list) + ']' elif len(tc_list) == 1: tc_str = tc_list[0] else: tc_str = '' result.append({ 'path': path, 'values': values_str, 'tc': tc_str }) # Sort by path for consistent output result.sort(key=lambda x: x['path']) return result
[docs] def process_directory(self, directory: str, output_csv: str): """ Process all XML files in a directory and update CSV file. Updates existing paths with new values/tc as lists. """ path = Path(directory) if not path.exists(): logger.error(f"Directory not found: {directory}") return # Get all .xml files (case insensitive) xml_files = [f for f in path.iterdir() if f.is_file() and f.suffix.lower() == '.xml'] if not xml_files: logger.warning(f"No XML files found in {directory}") return processed_files = 0 # Read existing CSV data logger.info(f"Reading existing CSV data from {output_csv}...") existing_data = self.read_existing_csv(output_csv) initial_paths = len(existing_data) # Process each XML file all_new_rows = [] for xml_file in xml_files: logger.info(f"Processing {xml_file.name}...") csv_data = self.extract_from_file(str(xml_file)) if csv_data: all_new_rows.extend(csv_data) logger.info(f" - Extracted {len(csv_data)} rows from {xml_file.name}") else: logger.warning(f" - No data extracted from {xml_file.name}") processed_files += 1 # Merge all data logger.info("Merging data...") merged_data = self.merge_data(existing_data, all_new_rows) # Write merged data to CSV with open(output_csv, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['path', 'values', 'tc']) writer.writeheader() writer.writerows(merged_data) logger.info(f"\nSummary:") logger.info(f" - Processed {processed_files} XML files") logger.info(f" - Initial paths in CSV: {initial_paths}") logger.info(f" - Final paths in CSV: {len(merged_data)}") logger.info(f" - New unique paths added: {len(merged_data) - initial_paths}") logger.info(f" - Output file: {output_csv}")