Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LogProcessors chaining through mutable reference #1726

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

lalitb
Copy link
Member

@lalitb lalitb commented May 9, 2024

Fixes #1650
As discussed here, this PR modifies LogProcessor to take mutable reference to LogData, and further pass it as Owned/Borrowed type to the Exporter.

Existing:

pub trait LogProcessor: Send + Sync + Debug {
    // ...
    fn emit(&self, data: LogData);
   // ...
}

#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
    // ... 
    async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
   // ..

This PR:

pub trait LogProcessor: Send + Sync + Debug {
    // ...
    fn emit(&self, data: &mut LogData);
   // ..
}

#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
   // ...
   async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>
  // ...
}
  • BatchLogProcessor clones the received mutable references of LogData before enqueueing them and passes(i.e moves) this cloned data to the exporter
  • SimpleLogProcessor passes received mutable references of LogData directly to the exporter as borrowed type, without cloning.
  • Exporters accept a slice of LogData references and process them accordingly.
  • These changes:
    - Enables passing modified LogData between processors in a chain.
    - Allows LogData to be passed from the SDK to the exporter without cloning, such as with SimpleLogProcessor and for exporters like opentelemetry-user-events-logs and opentelemetry-etw-logs that use a reentrant log processor.

This also adds certain constraints for implementing custom exporters and log-processors, else their compilation will fail:
Exporters:
The lifetime parameter 'a in Vec<Cow<'a, LogData>> ensures that the exporter do not retain references to the logdata beyond its intended lifespan. If there is a need to modify or store LogData beyond the duration of the export call, the data must be cloned.

Asynchronous Log-Processors:
When a LogProcessor processes LogData asynchronously, such as in the case of BatchLogProcessor, there is a risk that the data may outlive the scope from which it was passed. To prevent any issues, LogData should always be cloned before any asynchronous processing begins.

Synchronous Log Processors:
For synchronous processing within a LogProcessor, as in SimpleLogProcessor, it is important to ensure that references to LogData are contained within the bounds of the emit function. The LogData should be passed as a reference if all processing is completed within the emit function.

Benchmark:
with PR: full-log-with-attributes/with-context: 317.76 ns
main repo:full-log-with-attributes/with-context: 431.24 ns

Stress test
With PR:
Number of threads: 20
Throughput: 40,814,600 iterations/sec
Throughput: 40,845,400 iterations/sec
Throughput: 40,541,400 iterations/sec
Throughput: 40,676,800 iterations/sec
Throughput: 35,816,800 iterations/sec

main repo:
Number of threads: 20
Throughput: 26,506,800 iterations/sec
Throughput: 27,819,200 iterations/sec
Throughput: 28,112,400 iterations/sec
Throughput: 27,798,000 iterations/sec

Changes

Please provide a brief description of the changes here.

Merge requirement checklist

  • CONTRIBUTING guidelines followed
  • Unit tests added/updated (if applicable)
  • Appropriate CHANGELOG.md files updated for non-trivial, user-facing changes
  • Changes in public API reviewed (if applicable)

@lalitb lalitb requested a review from a team as a code owner May 9, 2024 00:22
@lalitb
Copy link
Member Author

lalitb commented May 9, 2024

Also the result of stress test. There is perf improvement as we no longer do clone in log_emitter.rs.

Existing:

Number of threads: 16
Throughput: 36,961,600 iterations/sec
Throughput: 37,929,800 iterations/sec

PR:

Number of threads: 16
Throughput: 64,182,600 iterations/sec
Throughput: 64,921,000 iterations/sec
Throughput: 63,776,800 iterations/sec

will compare and add bench too.

@lalitb
Copy link
Member Author

lalitb commented May 9, 2024

Benchmark

Test Name Main PR
simple-log/no-context [128.97 ns] [89.746 ns]
simple-log/with-context time [130.56 ns] [89.683 ns]
simple-log-with-int/no-context [171.34 ns] [106.89 ns]
simple-log-with-int/with-context [ 173.99 ns] [106.38 ns]
simple-log-with-double/no-context [172.04 ns] [106.54 ns]
simple-log-with-double/with-context [173.93 ns] [106.53 ns]
simple-log-with-string/no-context [172.15 ns] [107.70 ns]
simple-log-with-string/with-context [174.03 ns] [106.98 ns]
simple-log-with-bool/no-context [174.00 ns] [111.34 ns]
simple-log-with-bool/with-context [178.27 ns] [109.40 ns]
simple-log-with-bytes/no-context [209.17 ns] [131.57 ns]
simple-log-with-bytes/with-context [211.31 ns] [131.33 ns]
simple-log-with-a-lot-of-bytes/no-context [216.39 ns] [126.85 ns]
simple-log-with-a-lot-of-bytes/with-context [218.21 ns] [124.83 ns]
simple-log-with-vec-any-value/no-context [285.70 ns] [165.38 ns]
simple-log-with-vec-any-value/with-context [282.09 ns] [169.18 ns]
simple-log-with-inner-vec-any-value/no-context [393.67 ns] [223.43 ns]
simple-log-with-inner-vec-any-value/with-context [391.62 ns] [224.96 ns]
simple-log-with-map-any-value/no-context [328.11 ns] [177.65 ns]
simple-log-with-map-any-value/with-context [329.72 ns] [176.94 ns]
simple-log-with-inner-map-any-value/no-context [482.51 ns] [257.19 ns]
simple-log-with-inner-map-any-value/with-context [484.81 ns] [252.65 ns]
long-log/no-context [128.64 ns] [90.043 ns]
long-log/with-context [129.22 ns] [89.758 ns]
full-log/no-context [140.29 ns] [98.945 ns]
full-log/with-context [139.50 ns] [99.310 ns]
full-log-with-4-attributes/no-context [311.59 ns] [196.60 ns]
full-log-with-4-attributes/with-context [299.92 ns] [190.10 ns]
full-log-with-9-attributes/no-context [517.14 ns] [365.58 ns]
full-log-with-9-attributes/with-context [527.35 ns] [367.39 ns]
full-log-with-attributes/no-context [421.30 ns] [248.99 ns]
full-log-with-attributes/with-context [426.44 ns] [239.60 ns]

@cijothomas
Copy link
Member

Thanks for looking into this! The perf win is amazing!

Would the below statements be accurate?

  1. LogProcessors now get a mutable ref to LogData. If they want to make changes to LogData, they are allowed to do so, but they need to be aware that it'll be reflected in other processors registered after them.
  2. If a LogProcessor is an Exporting processor, and it has to batch LogData, then it must clone the data. Due to cloning, any changes made to the original one after the processor is invoked won't be visible to this LogProcessor.
  3. Currently the clone cost is paid by the SDK itself. But this PR makes it possible to write pipeline without any cloning (simple/re-entrant processors who export sync.). BatchingProcessors need to clone, but that is no worse than current.
  4. The most common use case of 1 or more enrichment/redactor processor, followed by BatchProcessor would be much faster with this change. Infact this was not even feasible before as enrichment/redactions got applied to the clone only
  5. The high-perf use case of 1 or more enrichment/redactor processor, following by Re-entrantProcessors would be much much faster now as we completely eliminate cloning.

Robert's suggestion of offering the ability to run independent pipelines (one pipeline with redaction processor exporting redacted data, and another pipeline exporting without redaction is possible with this change also, but it requires cloning to ensure the changes are only affecting that pipeline only)

@lalitb
Copy link
Member Author

lalitb commented May 9, 2024

Would the below statements be accurate?

Nicely summarised the changes :) Yes to all.

Copy link

codecov bot commented May 21, 2024

Codecov Report

Attention: Patch coverage is 76.97842% with 32 lines in your changes are missing coverage. Please review.

Project coverage is 73.6%. Comparing base (35f9a60) to head (bde9131).

Files Patch % Lines
opentelemetry-sdk/src/logs/log_processor.rs 87.0% 13 Missing ⚠️
opentelemetry-otlp/src/exporter/http/logs.rs 0.0% 8 Missing ⚠️
opentelemetry-stdout/src/logs/exporter.rs 0.0% 6 Missing ⚠️
opentelemetry-otlp/src/exporter/tonic/logs.rs 0.0% 4 Missing ⚠️
opentelemetry-otlp/src/logs.rs 0.0% 1 Missing ⚠️
Additional details and impacted files
@@          Coverage Diff          @@
##            main   #1726   +/-   ##
=====================================
  Coverage   73.6%   73.6%           
=====================================
  Files        124     124           
  Lines      19517   19612   +95     
=====================================
+ Hits       14377   14447   +70     
- Misses      5140    5165   +25     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@lalitb lalitb changed the title [For discussion] Support LogProcessors chaining through mutable reference Support LogProcessors chaining through mutable reference May 21, 2024
@lalitb
Copy link
Member Author

lalitb commented May 21, 2024

Have added unit-test to validate LogProcessor chaining and updating of LogData. This is ready for review now. Also added changelog and benchmark in the description.


let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla pointed out that instrumentation data shouldn't be allowed to be modified within processors. Will create a separate issue to track that.

//TODO :avoid cloning when logdata is borrowed?
let owned_batch = batch
.iter()
.map(|&log_data| log_data.clone()) // Converts Cow to owned LogData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no 🐮 now 🤣

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, had to bring her back in latest commit. Too much milking needed in batchprocessor without that.

/// Exports a batch of [`LogData`].
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
/// Exports a batch of reference to `LogData`.
async fn export<'a>(&mut self, batch: &'a [&'a LogData]) -> LogResult<()>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the exported type a slice of ref to LogData? Previously, it was a concrete type Vec. Please call it out in changelog.
(I agree with this change, just making sure the change is aptly reflected in changelog)

Copy link
Member Author

@lalitb lalitb May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to revert the slice changes as they complicated the handling in the batch processor. Previously, the batch processor directly moved the logs from the queue to the export method. Now, we pass a reference to the logs, track which logs have been exported, and then delete them from the queue. Since the queue can receive new logs while the export is happening, it is necessary to keep track of the logs that were exported. Also, didn't see any improvement in stress and batch tests with the changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Investigate Potential to bypass LogRecord cloning when emitting LogRecord to LogProcessor(s)
2 participants