Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ingestion TaskQueue #3136

Closed
wants to merge 73 commits into from
Closed

feat: Ingestion TaskQueue #3136

wants to merge 73 commits into from

Conversation

taimingl
Copy link
Collaborator

@taimingl taimingl commented Apr 2, 2024

impl #3099

Add a TaskQueue that serves as IngestBuffer to improve ingestion endpoints responsiveness in the cases where ingestion request spikes.

Basic flow:

Producers, ingestion endpoint handlers, /_json or /_bulk

  1. constructs Task from request payload and buffers into TaskQueue's channel.
  2. responds to client and listens to upcoming requests
├── "/_json"
│   └── pub async fn send_task(task: IngestEntry)
└───────└── TaskQueue::send_task(&self, task: IngestEntry)

Consumers, async workers, managed by TaskQueue

  1. takes Tasks out of TaskQueue's channel in batch
  2. persists received tasks to disk
  3. processes(ingests) each task
  4. updates the persisting disk file
├── Workers::new(count: usize, receiver: Arc<Receiver<IngestEntry>>)
│   └── fn init_worker(receiver: Arc<Receiver<IngestEntry>>)
│       └── async fn process_job(worker_id: String, receiver: Arc<Receiver<IngestEntry>>, store_sig_s: Sender<Option<Vec<IngestEntry>>>)
│       │   └── async fn process_tasks(worker_id: &str, tasks: &Vec<IngestEntry>)
│       └──  async fn persist_job(worker_id: String, store_sig_r: Receiver<Option<Vec<IngestEntry>>>)
└────────── └── fn persist_job_inner(path: &PathBuf, tasks: Option<Vec<IngestEntry>>)

Components

TaskQueue: mpmc of a single channel implemented with 'async-channels'

struct TaskQueue {
    sender: Arc<Sender<IngestEntry>>,
    workers: Arc<Workers>,
    current_size: usize,
}

Main methods:

async fn send_task(&self, task: IngestEntry)
async fn should_add_more_workers(&self)

Configured by:

  1. DEFAULT_CHANNEL_CAP: bounds channel capacity
  2. DEFAULT_WORKER_CNT: default initial & incremental workers count
  3. SEARCHABLE_LATENCY: acceptable latency between a request is accepted and searchable

Workers: consumer side of the TaskQueue

pub(super) struct Workers {
    pub receiver: Arc<Receiver<IngestEntry>>,
    pub handles: RwVec<Worker>,
}

Main methods:

async fn process_job(...)
async fn process_tasks(...)

Configured by:

  1. WORKER_DEFAULT_WAIT_TIME: default wait time between each pull from channel
  2. WORKER_BATCH_PROCESSING_SIZE: max number of requests a worker processes in one batch
  3. WORKER_MAX_IDLE_TIME: max idle time in seconds before a worker shuts itself down

Other util functions:

├── async fn replay_persisted_tasks()
│   └── fn decode_from_wal_file(wal_file: &PathBuf)
└───└── async fn process_tasks(worker_id: &str, tasks: &Vec<IngestEntry>)

taimingl added 20 commits May 5, 2024 14:59
- Memory: add `ZO_INGEST_BUFFER_QUEUE_CNT` to configure max_worker_count, which limits total memory usage
- Latency: added an estimated running average request size that is used to estimate wait time to ensure latency is within the constraint
add current time as timestamp value for buffered ingestion requests. This value will be used when request without timestamp in its payload is being ingested. This timestamp also helps maintain the order of the logs.
compress/decompress serialized/deserialized data to/from wal when persisting requests to disk.
@taimingl
Copy link
Collaborator Author

/ok-to-test sha=578578a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant