-
Notifications
You must be signed in to change notification settings - Fork 0
Home
This internal wiki serves as a comprehensive documentation source for the Appeals consumer application. The application is designed to interact with Kafka topics, precess incoming messages, and relay them to the Caseflow system.
The Appeals Consumer application facilitates real-time data processing and integration between systems by consuming messages from designated Kafka topics, processing these messages according to business logic, and forwarding them to Caseflow.
Detailed here are the architectural underpinnings of the Appeals Consumer application. supplemented by UML diagrams that elucidate its design and interactions.
The JSON schema that is being sent to Caseflow with examples of potential payloads after event consumption.
The initial Decision Review Creation: phase_1_final_caseflow_schema_examples_parameters_V4.json
Decision Review Creation with Remand: phase_2_remand_caseflow_schema_examples_parameters_V1.json
Decision Review Complete: phase_2_caseflow_schema_example_parameters_decision_review_completed_V1.json
Consumers are responsible for listening to Kafka topics and retrieving messages. They are designed to be highly available and scalable to handle varying loads.
-
Functionality: Consumers are called through the topic routing defined within the
karafka.rb
config file in the root of the project directory. They will handle the consumption of messages from the associated topic in bulk. Themessages
keyword points to a custom Karafka array like object which contains metadata about the message, as well as, the decoded message payload from ourAvroDeserializerService
. Raising an exception from the consumer will prevent the kafka message from being marked as "consumed". This is handle by anoffset
value that is unique to the consumer and topic. If the message is not marked as consumed and the offset is not updated, the consumer will continue to make attempts at consuming the message at that offset.
The consumer will be responsible for handling errors and retry logic for consuming messages.
-
Key Methods:
-
consume
is the public interface for the Consumers. This method is called when a message/messages are found while polling the associated topic. This is our entry point for handling the incoming messages. -
process_event
is a topic agnostic method defined in the parentApplicationConsumer
class. It will handle storing the event in the database if the event is a new record. It will then drop the newly created event into a job process SQS queue. Theprocess_event
method takes two arguments and a code block. The first argument should be a reference to an ActiveRecord Event object. The second should be a hash including any extra details for logging purposes. The block should contain the.perform_later
call for the processing job associated with that specific event. - There are various logging methods defined in the parent
ApplicationConsumer
that will handle logging to theKaraka.logger
through the life of the message consumption.
-
Processing Jobs take the data ingested by Consumers and apply business logic to process and transform the data before sending it to Caseflow
-
Workflow:
-
init_setup
will set the system admin to the RequestStore which is needed for BIS called during the processing of the event. It will also set an@event
instance variable. - If the event that is queued to be processed is already in an end state ("processed" or "failed), the job will return early as there is no need to continuing the processing attempt.
-
start_processing!
will update the event'sstate
to "in_progress" and generate anevent_audit
for this processing attempt. -
@event.process
will encapsulate all of the logic to process the event. This is a method that needs to be define for every type of event on the child polymorphic event model. -
complete_processing!
updates the event and event_audit to a completed state.
-
-
Reusability: All of the processing job logic is defined in a parent
BaseEventProcessingJob
. Whenever there are additional business logic needs before or after the existing logic, theperform
method is intended to be extended through the use of thesuper
keyword.
-
app/consumers/application_consumer.rb
contains methods that will be helpful for consuming any topic message includingprocess_event
and custom logging methods. -
app/jobs/base_event_processing_job.rb
contains the basic logic for tracking the event processing. It will include the database updates for event and event_audit along with logging and error handling. -
app/models/builders/base_dto_builder.rb
is a parent component that includes PII scrubbing for data transfer objects. -
app/models/event.rb
contains useful methods for event processing including state and timestamp updates, checks for failed vs errored states, determining the associated job through constantizing concatenated strings. -
app/models/event_audit.rb
also includes methods for updating statuses and timestamps but also include a scope for finding "stuck" jobs. -
app/services/logger_service.rb
is a custom logger that unifies the formatting for all messages sent to theRails.logger
. It can be added to any file by including the associatedLoggerMixin
at the top of the class or by initializing theLoggerServer.new(caller_class)
directly with the class name of the caller class as an argument.
Instructions for new team members to setup their development environment.