Source code for ace.centralize.notification_payload

import uuid
from datetime import datetime, date
from typing import List, Any, Optional
from dateutil import parser


# ================================= Micro Service Notification Payloads models =========================================

# =========== Billing Service Notification ================

[docs] class Entity:
[docs] def __init__( self, total_amount: Optional[float] = None, entity_type: Optional[str] = None, entity_id: Optional[int] = None ): self.totalAmount = total_amount self.entityType = entity_type self.entityId = entity_id
[docs] class PaymentRequest:
[docs] def __init__( self, payment_request_id: Optional[int] = None, account_id: Optional[int] = None, status_id: Optional[int] = None, request_id: Optional[str] = None, currency_id: Optional[int] = None, gateway_payment_request_id: Optional[str] = None, change_status_datetime_utc: Optional[datetime] = None, payment_request_datetime_utc: Optional[datetime] = None, creation_datetime_utc: Optional[datetime] = None, creation_user_id: Optional[int] = None, entities: Optional[List[Entity]] = None, total_amount: Optional[float] = None ): self.id = payment_request_id self.accountId = account_id self.statusId = status_id self.requestId = request_id self.currencyId = currency_id self.gatewayPaymentRequestId = gateway_payment_request_id self.changeStatusDateTimeUTC = change_status_datetime_utc self.paymentRequestDateTimeUTC = payment_request_datetime_utc self.creationDateTimeUTC = creation_datetime_utc self.creationUserId = creation_user_id self.entities = entities self.totalAmount = total_amount
[docs] class BillingNotification:
[docs] def __init__( self, notification_id: Optional[str] = None, request_id: Optional[str] = None, payment_request_id: Optional[int] = None, payment_request_event_type_id: Optional[int] = None, payload: Optional[str] = None, gateway_event_id: Optional[str] = None, gateway_creation_datetime_utc: Optional[datetime] = None, creation_datetime_utc: Optional[datetime] = None, payment_processed_datetime_utc: Optional[datetime] = None, payment_request: Optional[PaymentRequest] = None ): self.id = notification_id self.requestId = request_id self.paymentRequestId = payment_request_id self.paymentRequestEventTypeId = payment_request_event_type_id self.payload = payload self.gatewayEventId = gateway_event_id self.gatewayCreationDateTimeUTC = gateway_creation_datetime_utc self.creationDateTimeUTC = creation_datetime_utc self.paymentProcessedDateTimeUTC = payment_processed_datetime_utc self.paymentRequest = payment_request
[docs] class ApplicationCreation:
[docs] def __init__( self, application_id: int, submitted_date: date, creation_user_id: int ): self.applicationId = application_id self.submittedDate = submitted_date self.creationUserId = creation_user_id
[docs] class AgentCreation:
[docs] def __init__( self, agent_id: int, status_id: int, agent_number: str ): self.agentId = agent_id self.statusId = status_id self.agentNumber = agent_number
[docs] class UpdateObject:
[docs] def __init__( self, old: str, new: str ): self.old = old self.new = new
def __getitem__(self, key): if key == 'old': return self.old elif key == 'new': return self.new else: raise KeyError(f"Key '{key}' does not exist in UpdateObject")
[docs] class ApplicationUpdate:
[docs] def __init__( self, application_id: int, creation_user_id: int, submitted_date: Optional[date] = None ): self.applicationId = application_id self.creationUserId = creation_user_id self.submittedDate = submitted_date
# ========== Get Billing Service Notification Payloads ===============
[docs] def get_payment_request_info(payload) -> PaymentRequest: payment_request_info_payload = payload.get("paymentRequest") entities_list = [] if payment_request_info_payload: entities_payload = payment_request_info_payload.get("entities") if entities_payload: for entity in entities_payload: entities_list.append( Entity( total_amount=entity.get("totalAmount"), entity_type=entity.get("entityType"), entity_id=entity.get("entityId") ) ) return PaymentRequest( payment_request_id=payment_request_info_payload.get("id"), account_id=payment_request_info_payload.get("accountId"), status_id=payment_request_info_payload.get("statusId"), request_id=payment_request_info_payload.get("requestId"), currency_id=payment_request_info_payload.get("currencyId"), gateway_payment_request_id=payment_request_info_payload.get("gatewayPaymentRequestId"), change_status_datetime_utc=parser.parse(payment_request_info_payload.get("changeStatusDateTimeUTC")) if payment_request_info_payload.get("changeStatusDateTimeUTC") else None, payment_request_datetime_utc=parser.parse(payment_request_info_payload.get("paymentRequestDateTimeUTC")) if payment_request_info_payload.get("paymentRequestDateTimeUTC") else None, creation_datetime_utc=parser.parse(payment_request_info_payload.get("creationDateTimeUTC")) if payment_request_info_payload.get("creationDateTimeUTC") else None, creation_user_id=payment_request_info_payload.get("creationUserId"), entities=entities_list, total_amount=payment_request_info_payload.get("totalAmount") )
# ============= Build Billing service Notification object ============
[docs] def get_billing_notification(payload) -> BillingNotification: return BillingNotification( notification_id=payload.get("id"), request_id=payload.get("requestId"), payment_request_id=payload.get("paymentRequestId"), payment_request_event_type_id=payload.get("paymentRequestEventTypeId"), payload=payload.get("payload"), gateway_event_id=payload.get("gatewayEventId"), gateway_creation_datetime_utc=parser.parse(payload.get("gatewayCreationDateTimeUTC")) if payload.get("gatewayCreationDateTimeUTC") else None, creation_datetime_utc=parser.parse(payload.get("creationDateTimeUTC")) if payload.get("creationDateTimeUTC") else None, payment_processed_datetime_utc=parser.parse(payload.get("paymentProcessedDateTimeUTC")) if payload.get("paymentProcessedDateTimeUTC") else None, payment_request=get_payment_request_info(payload) )
[docs] def get_application_creation(payload) -> ApplicationCreation: return ApplicationCreation( application_id=payload.get("applicationId"), submitted_date=datetime.strptime(payload.get("submittedDate"), "%Y-%m-%d"), creation_user_id=payload.get("creationUserId") )
[docs] def get_application_update(payload) -> ApplicationUpdate: return ApplicationUpdate( application_id=payload.get("applicationId"), #submitted_date=datetime.strptime(payload.get("submittedDate"), "%Y-%m-%d"), creation_user_id=payload.get("creationUserId") )
[docs] def get_agent_creation(payload) -> AgentCreation: return AgentCreation( agent_id=payload.get("agentId"), agent_number=payload.get("agentNumber"), status_id=payload.get("statusId") )