Source code for ace.tools.xml.utils
import xml.etree.ElementTree as ET
import csv
import os
from pathlib import Path
from typing import Dict, List
import logging
from collections import defaultdict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
[docs]
class XMLExtractor:
[docs]
def extract_element_data(self, element, parent_path=""):
"""
Recursively extract data from XML elements including tc attributes.
"""
results = []
# Build current path
tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag
current_path = f"{parent_path}/{tag}" if parent_path else tag
# Extract current element data
element_data = {
'path': current_path,
'text': element.text.strip() if element.text and element.text.strip() else None,
'tc_value': element.get('tc')
}
# Only add if there's meaningful content (text or tc attribute)
if element_data['text'] or element_data['tc_value']:
results.append(element_data)
# Process child elements
for child in element:
results.extend(self.extract_element_data(child, current_path))
return results
[docs]
def extract_from_file(self, file_path: str) -> List[Dict]:
"""
Extract all data from a single XML file.
Returns list of dictionaries with path, values, tc.
"""
try:
tree = ET.parse(file_path)
root = tree.getroot()
# Extract all data
elements = self.extract_element_data(root)
# Format for CSV output
csv_rows = []
for element in elements:
row = {
'path': element['path'],
'values': element['text'] or '',
'tc': element['tc_value'] or ''
}
csv_rows.append(row)
return csv_rows
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
return []
[docs]
def read_existing_csv(self, csv_file: str) -> Dict[str, Dict]:
"""
Read existing CSV and return a dictionary with path as key.
Values and tc are stored as sets to handle multiple occurrences without duplicates.
"""
existing_data = {}
if not os.path.exists(csv_file):
return existing_data
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
path = row['path']
values = row['values']
tc = row['tc']
# Parse existing lists (format: [val1,val2,val3])
if values.startswith('[') and values.endswith(']'):
values_set = set(v.strip() for v in values[1:-1].split(',') if v.strip())
else:
values_set = {values} if values else set()
if tc.startswith('[') and tc.endswith(']'):
tc_set = set(t.strip() for t in tc[1:-1].split(',') if t.strip())
else:
tc_set = {tc} if tc else set()
existing_data[path] = {
'values': values_set,
'tc': tc_set
}
except Exception as e:
logger.error(f"Error reading existing CSV: {str(e)}")
return existing_data
[docs]
def merge_data(self, existing_data: Dict, new_rows: List[Dict]) -> List[Dict]:
"""
Merge new data with existing data using sets to avoid duplicates.
"""
# Create a working copy of existing data using sets
merged_data = defaultdict(lambda: {'values': set(), 'tc': set()})
# Copy existing data
for path, data in existing_data.items():
merged_data[path]['values'] = data['values'].copy()
merged_data[path]['tc'] = data['tc'].copy()
# Add new data (sets automatically handle duplicates)
for row in new_rows:
path = row['path']
value = row['values']
tc = row['tc']
# Add to sets if not empty
if value:
merged_data[path]['values'].add(value)
if tc:
merged_data[path]['tc'].add(tc)
# Convert back to list format for CSV
result = []
for path, data in merged_data.items():
# Convert sets to sorted lists for consistent output
values_list = sorted(list(data['values']))
tc_list = sorted(list(data['tc']))
# Format as lists if multiple values
if len(values_list) > 1:
values_str = '[' + ','.join(values_list) + ']'
elif len(values_list) == 1:
values_str = values_list[0]
else:
values_str = ''
if len(tc_list) > 1:
tc_str = '[' + ','.join(tc_list) + ']'
elif len(tc_list) == 1:
tc_str = tc_list[0]
else:
tc_str = ''
result.append({
'path': path,
'values': values_str,
'tc': tc_str
})
# Sort by path for consistent output
result.sort(key=lambda x: x['path'])
return result
[docs]
def process_directory(self, directory: str, output_csv: str):
"""
Process all XML files in a directory and update CSV file.
Updates existing paths with new values/tc as lists.
"""
path = Path(directory)
if not path.exists():
logger.error(f"Directory not found: {directory}")
return
# Get all .xml files (case insensitive)
xml_files = [f for f in path.iterdir() if f.is_file() and f.suffix.lower() == '.xml']
if not xml_files:
logger.warning(f"No XML files found in {directory}")
return
processed_files = 0
# Read existing CSV data
logger.info(f"Reading existing CSV data from {output_csv}...")
existing_data = self.read_existing_csv(output_csv)
initial_paths = len(existing_data)
# Process each XML file
all_new_rows = []
for xml_file in xml_files:
logger.info(f"Processing {xml_file.name}...")
csv_data = self.extract_from_file(str(xml_file))
if csv_data:
all_new_rows.extend(csv_data)
logger.info(f" - Extracted {len(csv_data)} rows from {xml_file.name}")
else:
logger.warning(f" - No data extracted from {xml_file.name}")
processed_files += 1
# Merge all data
logger.info("Merging data...")
merged_data = self.merge_data(existing_data, all_new_rows)
# Write merged data to CSV
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['path', 'values', 'tc'])
writer.writeheader()
writer.writerows(merged_data)
logger.info(f"\nSummary:")
logger.info(f" - Processed {processed_files} XML files")
logger.info(f" - Initial paths in CSV: {initial_paths}")
logger.info(f" - Final paths in CSV: {len(merged_data)}")
logger.info(f" - New unique paths added: {len(merged_data) - initial_paths}")
logger.info(f" - Output file: {output_csv}")