Source code for ace.utils

import re
import logging
from typing import Any, Optional
import uuid
import json
import datetime
from decimal import Decimal


[docs] class BaseOutput: """this class defines the output objects that the rules engine can return"""
[docs] def __init__(self, output_type, result=None, validation=None, api_request_exception_stop=False): self.api_request_exception_stop = api_request_exception_stop self.outputType = output_type self.result = result self.validation = validation
[docs] def as_dict(self): return convert_to_dict(self)
[docs] class JsonEncoder(json.JSONEncoder):
[docs] def default(self, obj: Any) -> Any: if isinstance(obj, uuid.UUID): return obj.hex if isinstance(obj, Decimal): return float(obj) if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() if hasattr(obj, "__dict__"): return obj.__dict__ return json.JSONEncoder.default(self, obj)
[docs] def try_parsing_date(possible_date): """ Try to parse a date using several formats, warn about problematic value if the possible_date does not match any of the formats tried """ for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%d/%m/%Y", "%m/%d/%Y", "%Y-%m-%d"): try: return datetime.datetime.strptime(possible_date, fmt) except ValueError: pass raise ValueError(f"Non-valid date format for '{possible_date}'")
[docs] def profile(func): from functools import wraps @wraps(func) def wrapper(*args, **kwargs): from line_profiler import LineProfiler prof = LineProfiler() try: return prof(func)(*args, **kwargs) finally: prof.print_stats() return wrapper
[docs] def convert_to_dict(obj): if isinstance(obj, dict): return {key: convert_to_dict(value) for key, value in obj.items()} elif hasattr(obj, '__dict__'): return convert_to_dict(obj.__dict__) elif isinstance(obj, list): return [convert_to_dict(item) for item in obj] elif isinstance(obj, uuid.UUID): return obj.hex elif isinstance(obj, Decimal): return float(obj) elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() else: return obj
[docs] def is_positive_integer(value): return isinstance(value, int)
[docs] def is_valid_sort_order(value): return value.upper() in ["ASC", "DESC"]
[docs] def ensure_list(value): """Convert a single item to a list containing that item, or return the original if already a list.""" if value is None: return [] elif isinstance(value, list): return value else: return [value]
[docs] def get_text_value(xml_dict, key): """Extract text value from XML dictionary regardless of structure.""" if key not in xml_dict: return None value = xml_dict[key] # If it's a simple string if isinstance(value, str): return value # If it's a dictionary with #text elif isinstance(value, dict) and "#text" in value: return value["#text"] # Handle other cases or return None if not found return None
[docs] def bool_to_char(value: Optional[bool]): if value is None: return None if value: return "Y" return "N"
[docs] def get(dictionary: dict, *keys: str): if dictionary is None: return None current = dictionary for key in keys: current = current.get(key, None) if current is None: return None return current
[docs] def get_array(dictionary: dict, *keys: str): return get(dictionary, *keys) or []
[docs] def sanitize_json(json_string: str): """Clean common JSON syntax errors""" # Remove extra commas before closing braces/brackets json_string = re.sub(r',\s*}', '}', json_string) json_string = re.sub(r',\s*]', ']', json_string) # Remove commas right after opening braces/brackets json_string = re.sub(r'{\s*,', '{', json_string) json_string = re.sub(r'\[\s*,', '[', json_string) # Remove multiple consecutive commas json_string = re.sub(r',\s*,+', ',', json_string) # Remove whitespace around braces and brackets json_string = re.sub(r'\s*{\s*', '{', json_string) json_string = re.sub(r'\s*}\s*', '}', json_string) return json_string
[docs] def safe_json_parse(payload: str): """Parse JSON with automatic cleaning""" try: return json.loads(payload) except json.JSONDecodeError: try: parsed_payload = sanitize_json(payload) return json.loads(parsed_payload) except json.JSONDecodeError as e: logging.debug(f"Could not parse JSON payload: {e}") return None