Skip to content

Commit d6031c6

Browse files
authoredSep 29, 2023
fix: Merge Watch & Local events. Switch by uid & type grouping. (#616)
Watcher and Local events are now merged to produce a sequence containing all captured events instead of just the sequence that was last produced. Events will still be switch but only at group level, grouping by resource Uid and sub-grouping by event type, therefore Keeping always last event for each type of each resources, preventing events accumulation. Fixes #585 #579
1 parent 1602b1c commit d6031c6

File tree

5 files changed

+59
-8
lines changed

5 files changed

+59
-8
lines changed
 

‎src/KubeOps/Operator/Caching/IResourceCache.cs

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ internal interface IResourceCache<TEntity>
1010

1111
TEntity Upsert(TEntity resource, out CacheComparisonResult result);
1212

13+
bool Exists(TEntity resource);
14+
1315
void Fill(IEnumerable<TEntity> resources);
1416

1517
void Remove(TEntity resource);

‎src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public void Clear()
7474
_metrics.CachedItemsSummary.Observe(_cache.Count);
7575
}
7676

77+
public bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);
78+
7779
private CacheComparisonResult CompareCache(TEntity resource)
7880
{
7981
if (!Exists(resource))
@@ -101,8 +103,6 @@ private CacheComparisonResult CompareCache(TEntity resource)
101103
return CacheComparisonResult.Other;
102104
}
103105

104-
private bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);
105-
106106
private void Remove(string resourceUid)
107107
{
108108
_cache.TryRemove(resourceUid, out _);

‎src/KubeOps/Operator/Controller/EventQueue.cs

+4-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ public EventQueue(
4949
.GroupBy(e => e.Resource.Uid())
5050
.Select(
5151
group => group
52-
.Select(ProcessDelay)
53-
.Switch())
52+
.GroupBy(e => e.Type)
53+
.Select(typedGroup => typedGroup
54+
.Select(ProcessDelay).Switch())
55+
.Merge())
5456
.Merge()
5557
.Select(UpdateResourceData)
5658
.Merge()

‎src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs

+14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using k8s.Models;
99

1010
using KubeOps.KubernetesClient;
11+
using KubeOps.Operator.Caching;
1112
using KubeOps.Operator.DevOps;
1213

1314
namespace KubeOps.Operator.Kubernetes;
@@ -21,6 +22,7 @@ internal class ResourceWatcher<TEntity> : IDisposable, IResourceWatcher<TEntity>
2122
private readonly IKubernetesClient _client;
2223
private readonly ILogger<ResourceWatcher<TEntity>> _logger;
2324
private readonly IResourceWatcherMetrics<TEntity> _metrics;
25+
private readonly IResourceCache<TEntity> _resourceCache;
2426
private readonly OperatorSettings _settings;
2527
private readonly Subject<TimeSpan> _reconnectHandler = new();
2628
private readonly IDisposable _reconnectSubscription;
@@ -34,11 +36,13 @@ public ResourceWatcher(
3436
IKubernetesClient client,
3537
ILogger<ResourceWatcher<TEntity>> logger,
3638
IResourceWatcherMetrics<TEntity> metrics,
39+
IResourceCache<TEntity> resourceCache,
3740
OperatorSettings settings)
3841
{
3942
_client = client;
4043
_logger = logger;
4144
_metrics = metrics;
45+
_resourceCache = resourceCache;
4246
_settings = settings;
4347
_reconnectSubscription =
4448
_reconnectHandler
@@ -131,6 +135,16 @@ private void OnWatcherEvent(WatchEventType type, TEntity resource)
131135

132136
_metrics.WatchedEvents.Inc();
133137

138+
if (_resourceCache.Exists(resource) && type == WatchEventType.Added)
139+
{
140+
_logger.LogTrace(
141+
@"The resource ""{kind}/{name}"" binded to the watcher event already exist in cache. Skipping ""{watchEvent}"" event",
142+
resource.Kind,
143+
resource.Name(),
144+
type);
145+
return;
146+
}
147+
134148
switch (type)
135149
{
136150
case WatchEventType.Added:

‎tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs

+37-4
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99

1010
using KubeOps.KubernetesClient;
1111
using KubeOps.Operator;
12+
using KubeOps.Operator.Caching;
1213
using KubeOps.Operator.DevOps;
1314
using KubeOps.Operator.Kubernetes;
15+
using KubeOps.Test.TestEntities;
16+
using KubeOps.TestOperator.Entities;
1417

1518
using Microsoft.Extensions.Logging.Abstractions;
1619
using Microsoft.Reactive.Testing;
@@ -35,6 +38,10 @@ public class TestResource : IKubernetesObject<V1ObjectMeta>
3538

3639
private readonly Mock<IKubernetesClient> _client = new();
3740
private readonly Mock<IResourceWatcherMetrics<TestResource>> _metrics = new();
41+
private readonly Mock<IResourceCache<TestResource>> _resourceCacheMock;
42+
43+
public ResourceWatcherTest() =>
44+
_resourceCacheMock = new Mock<IResourceCache<TestResource>>(MockBehavior.Strict);
3845

3946
[Fact]
4047
public async Task Should_Restart_Watcher_On_Exception()
@@ -50,6 +57,7 @@ public async Task Should_Restart_Watcher_On_Exception()
5057
_client.Object,
5158
new NullLogger<ResourceWatcher<TestResource>>(),
5259
_metrics.Object,
60+
_resourceCacheMock.Object,
5361
settings);
5462

5563
var testScheduler = new TestScheduler();
@@ -105,6 +113,7 @@ public async Task Should_Not_Throw_Overflow_Exception()
105113
_client.Object,
106114
new NullLogger<ResourceWatcher<TestResource>>(),
107115
_metrics.Object,
116+
_resourceCacheMock.Object,
108117
settings);
109118

110119
var testScheduler = new TestScheduler();
@@ -139,7 +148,12 @@ public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_
139148
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
140149
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
141150

142-
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
151+
using var resourceWatcher = new ResourceWatcher<TestResource>(
152+
_client.Object,
153+
new NullLogger<ResourceWatcher<TestResource>>(),
154+
_metrics.Object,
155+
_resourceCacheMock.Object,
156+
settings);
143157

144158
await resourceWatcher.StartAsync();
145159

@@ -166,7 +180,12 @@ public async Task Should_Not_Restart_On_Serialization_Exception()
166180
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
167181
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
168182

169-
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
183+
using var resourceWatcher = new ResourceWatcher<TestResource>(
184+
_client.Object,
185+
new NullLogger<ResourceWatcher<TestResource>>(),
186+
_metrics.Object,
187+
_resourceCacheMock.Object,
188+
settings);
170189

171190
await resourceWatcher.StartAsync();
172191

@@ -188,7 +207,12 @@ public async Task Should_Be_Restarted_After_TaskCanceledException_IOException()
188207

189208
SetupResourceWatcherMetrics();
190209

191-
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
210+
using var resourceWatcher = new ResourceWatcher<TestResource>(
211+
_client.Object,
212+
new NullLogger<ResourceWatcher<TestResource>>(),
213+
_metrics.Object,
214+
_resourceCacheMock.Object,
215+
settings);
192216

193217
await resourceWatcher.StartAsync();
194218

@@ -212,6 +236,9 @@ public async Task Should_Publish_On_Watcher_Event()
212236

213237
Action<WatchEventType, TestResource> onWatcherEvent = null!;
214238

239+
_resourceCacheMock.Setup(c => c.Exists(It.IsAny<TestResource>()))
240+
.Returns(false);
241+
215242
_client.Setup(
216243
c => c.Watch(
217244
It.IsAny<TimeSpan>(),
@@ -234,6 +261,7 @@ public async Task Should_Publish_On_Watcher_Event()
234261
_client.Object,
235262
new NullLogger<ResourceWatcher<TestResource>>(),
236263
_metrics.Object,
264+
_resourceCacheMock.Object,
237265
settings);
238266

239267
var watchEvents = resourceWatcher.WatchEvents.Replay(1);
@@ -273,7 +301,12 @@ public async Task Should_Restart_Watcher_On_Close()
273301
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
274302
_metrics.Setup(c => c.WatcherClosed).Returns(Mock.Of<ICounter>());
275303

276-
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
304+
using var resourceWatcher = new ResourceWatcher<TestResource>(
305+
_client.Object,
306+
new NullLogger<ResourceWatcher<TestResource>>(),
307+
_metrics.Object,
308+
_resourceCacheMock.Object,
309+
settings);
277310

278311
await resourceWatcher.StartAsync();
279312

0 commit comments

Comments
 (0)
Please sign in to comment.