Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a cache for Publishers that tracks subscriptions to manage the cache #2861

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

mgodave
Copy link
Contributor

@mgodave mgodave commented Mar 5, 2024

Motivation:

Handling the caching of Publishers comes up often and correctly managing the cache can be tricky and error prone to implement correctly. Scenarios where caching of Publishers can be useful include those similar to the multicast and replay operators but have the added dimension of asynchronous access, for instance multiple requests which need to consume the same data.

Modifications:

Add a PublisherCache utility that manages the lifecycle of a cached Publisher. A publisher is removed from the cache when it no longer has any subscriptions.

return publisherCache.get(key).publisher;
}

final Holder<T> item2 = new Holder<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better name for item2?

});
}

private static final class Holder<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is here so that we can add it to the hashmap and avoid some jumping through hoops to manage the reference equality stuff for HashMap. A quick code comment might be helpful for future readers.

});

item2.publisher = multicastStrategy.apply(newPublisher)
.liftSync(subscriber -> new Subscriber<T>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this second liftSync? I think it applies to the individual streams. In my minds eye that means if a single stream ends it removes the underlying stream from the cache. I don't know how that would happen unless the parent publisher completed and we remove it at that level as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there to handle subscriber errors and clean up if the subscriber is put into a bad state via a throw. In fact it looks like I missed a case with onNext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test case that would demonstrate their purpose? If I delete the second liftsync and move the syncrhronized to the first everything still works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgodave - can you add a comment along the lines of:

Motivation for this being "after" the multicast is bcz multicast doesn't propagate cancellation upstream unless there are no subscribers (e.g. they all cancel) ... so we acquire the lock in cancel here, there are no async boundaries in multi-cast, and then we remove from the map in cancel "above" multicast. This prevents race conditions where someone does a get and we return a Publisher that has been cancelled (because there are no subscriber).

Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.

});

item2.publisher = multicastStrategy.apply(newPublisher)
.liftSync(subscriber -> new Subscriber<T>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgodave - can you add a comment along the lines of:

Motivation for this being "after" the multicast is bcz multicast doesn't propagate cancellation upstream unless there are no subscribers (e.g. they all cancel) ... so we acquire the lock in cancel here, there are no async boundaries in multi-cast, and then we remove from the map in cancel "above" multicast. This prevents race conditions where someone does a get and we return a Publisher that has been cancelled (because there are no subscriber).

Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.

@Override
public void cancel() {
try {
assert Thread.holdsLock(publisherCache);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a comment here too (lock was acquired after the multi-cast, we need to be holding the lock here to interact with the map and prevent returning a cancelled Publisher)

*/
public Publisher<T> get(final K key, final Function<K, Publisher<T>> publisherSupplier) {
return Publisher.defer(() -> {
synchronized (publisherCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized has been used in the past bcz it doesn't require additional allocations. however loom fibers don't support synchronized, should we use Lock objects instead?

}

private void lockRemoveFromMap() {
synchronized (publisherCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment here to the effect:

  • completion of the first Subscriber after multicast means the the multicast operator is in a terminal state and we therefore remove it from the map. There are cases where folks may want to re-subscribe to the Publisher (e.g. get the cached value, trigger another event) however that currently isn't supported and we favor bounding the size of the map which has scope outside the operator chain.

* @param <T> the type of the {@link Publisher} contained in the cache.
* @return a new PublisherCache that will wrap cached values with multicast operator.
*/
public static <K, T> PublisherCache<K, T> multicast() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by exposing these static method we may need to add a new factory method here for each operator overload. did you consider instead exposing just a Function<> (or similar) so folks can apply the variant they want? some risks maybe:

  • folks could apply operators that don't obey the assumptions (we could clarify the constraints the operator must abide by ... allows for multiple subscribers, cancels upstream only after no subscribers present, no async cancel processing as the synchronization here depends upon it, ..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this. I'm open to removing these, the create() method just above is essentially what you describe, the user would specify the Publisher configured however they desire in the function on get.

I had initially built this without the static methods and a constructor that took a function which would be used to supply a new Publisher on a cache miss. I decided to move this function to the get method as it emulated how I might expect to use a cache, ex: I might not want a function from name -> Publisher but rather I would prefer a closure that would allow me to use the context at hand to instantiate the new object.

For an initial API I don't have any problem deferring to your suggestion as we learn how this ultimately ends up being used.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this go into io.servicetalk.concurrent.api.internal for now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants