Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed AzureLogHandler with multiple processes. #1158

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

JeremyVriens
Copy link
Contributor

These changes fix both #900 and #928, but I'm not so sure about the compatibility with Python2.
I changed the normal queue.Queue to a multiprocessing.Queue and converted the LogRecord to an envelope before putting it on the queue as serializing a LogRecord didn't work when it contained a traceback.

@google-cla
Copy link

google-cla bot commented Aug 23, 2022

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@lzchen
Copy link
Contributor

lzchen commented Aug 23, 2022

@JeremyVriens
Thanks for the contribution! Please sign the CLA when you have the time.

@@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent):
class Queue(object):
def __init__(self, capacity):
self.EXIT_EVENT = QueueExitEvent('EXIT')
self._queue = queue.Queue(maxsize=capacity)
self._queue = multiprocessing.Queue(maxsize=capacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue.Queue was created to work in concurrent environments spawned with the threading module, which is the behavior of the Azure exporters. With that being said, we probably shouldn't replace the default queue that is being used, since the original use case is supposed to be for the concurrent environments. I suggest adding the ability for the user to configure which type of queue they want by adding an option here. Then you can create a different type queue accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work with threading, however in an environment where multiple processes are being used (see linked bug reports), the queue is not being shared between the different processes. Therefore a multiprocessing.Queue is required to get this to work.

# queue as a LogRecord object is not serializable, while an Envelope
# object is.
envelope = self.log_record_to_envelope(record)
self._queue.put(envelope, block=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does object need to be serializable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to share messages/resources between different processes, the multiprocessing.Queue pickles the object so that it can be piped: https://github.com/python/cpython/blob/3.10/Lib/multiprocessing/queues.py#L244

@@ -75,8 +75,7 @@ def __init__(self, **options):
def _export(self, batch, event=None): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an entry to the CHANGELOG

@@ -75,8 +75,7 @@ def __init__(self, **options):
def _export(self, batch, event=None): # pragma: NO COVER
try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
envelopes = self.apply_telemetry_processors(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work because apply_telemetry_processors is expecting an Envelope data type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work; the batch is a list of Envelope data type objects as I converted them already before putting them on the queue.

# Convert the raw LogRecord to an envelope before putting it on the
# queue as a LogRecord object is not serializable, while an Envelope
# object is.
envelope = self.log_record_to_envelope(record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are performance implications of doing this. We don't want to be mapping every time logger.X() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is indeed a performance loss here. However it is not possible to put a raw LogRecord on the multiprocessing.Queue as it pickles the object.

@JeremyVriens
Copy link
Contributor Author

@JeremyVriens
Thanks for the contribution! Please sign the CLA when you have the time.

I did, but I did a commit with the wrong git email config. Will try to fix this asap when I got time.

@JeremyVriens
Copy link
Contributor Author

This is a hard to tackle issue (you can tell by the bug report discussions) and I'm not so sure if there's a different/better way of sharing the queue between different processes.
I do agree that the performance implication is not desired, but I think it's inevitable when you work with different processes. I will spend some more time to see if I can make it configurable (perhaps an option where you can say: multiprocessing=True/False(default)) and at least for now this is a working PoC.

@JeremyVriens JeremyVriens marked this pull request as draft August 23, 2022 18:19
@Korred
Copy link

Korred commented Aug 23, 2022

@JeremyVriens thanks for your POC! I just came across issue #900 a few days ago, while trying to pass logs from Celery to Azure Application Insights and seeing that this was first reported 2 years ago I actually didn't expect this issue to be picked up and "fixed" in the near future. Getting this properly fixed would be really helpful for a lot of people with this kind of setup!

@mockodin
Copy link

mockodin commented Apr 25, 2023

This works great for monkey patching.. any plans to complete this?

I don't believe python2 should be a concern at this point.

@JeremyVriens is looks like this is largely just pending CLA check for you. Alternatively I suppose someone could resubmit if you are no longer watching this?

@burkol
Copy link

burkol commented Jun 5, 2023

Hi All,

I faced the same issue.
Is there a possibility that it will be accepted in the near future?

Regards,
Zoltan

@vvxhid
Copy link

vvxhid commented Oct 16, 2023

May I ask for an estimated merge date for this pull request? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants