from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Union
from ace.utils import convert_to_dict
[docs]
@dataclass
class APIMessage:
auth_service: str
url: str
method: str
headers: Dict[str, str]
params: Optional[Dict[str, Any]]
data: Dict[str, Any]
message_response: Dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class EventMessage:
queue: str
routing_key: str
request_id: Optional[str] = None
body: Optional[Dict[str, Any]] = None
[docs]
@dataclass
class ResponseMessage:
message_type: str
message_body: Union["APIMessage", "EventMessage", Dict[str, Any], None] = None
[docs]
@dataclass
class ValidationMessage:
type: str
message: str
[docs]
class BaseOutput:
outputType: str
result: Optional[Dict[str, Any]] = None
output: List[ResponseMessage]
validation: List[ValidationMessage]
api_request_exception_stop: bool = False
[docs]
def __init__(self, output_type, api_request_exception_stop=False):
self.api_request_exception_stop = api_request_exception_stop
self.outputType = output_type
self.result = {}
self.output = []
self.validation = []
[docs]
def as_dict(self):
return convert_to_dict(self)