|
87 | 87 | import io.opentelemetry.api.common.Attributes;
|
88 | 88 | import io.opentelemetry.api.common.AttributesBuilder;
|
89 | 89 | import io.opentelemetry.api.metrics.Meter;
|
| 90 | +import java.io.PrintWriter; |
| 91 | +import java.io.StringWriter; |
90 | 92 | import java.util.ArrayList;
|
91 | 93 | import java.util.HashSet;
|
92 | 94 | import java.util.Iterator;
|
@@ -577,6 +579,7 @@ public PooledSessionFuture replaceSession(
|
577 | 579 | numSessionsInUse--;
|
578 | 580 | numSessionsReleased++;
|
579 | 581 | checkedOutSessions.remove(session);
|
| 582 | + markedCheckedOutSessions.remove(session); |
580 | 583 | }
|
581 | 584 | session.leakedException = null;
|
582 | 585 | invalidateSession(session.get());
|
@@ -1333,6 +1336,9 @@ void clearLeakedException() {
|
1333 | 1336 | private void markCheckedOut() {
|
1334 | 1337 | if (options.isTrackStackTraceOfSessionCheckout()) {
|
1335 | 1338 | this.leakedException = new LeakedSessionException();
|
| 1339 | + synchronized (SessionPool.this.lock) { |
| 1340 | + SessionPool.this.markedCheckedOutSessions.add(this); |
| 1341 | + } |
1336 | 1342 | }
|
1337 | 1343 | }
|
1338 | 1344 |
|
@@ -1526,6 +1532,7 @@ public ApiFuture<Empty> asyncClose() {
|
1526 | 1532 | synchronized (lock) {
|
1527 | 1533 | leakedException = null;
|
1528 | 1534 | checkedOutSessions.remove(this);
|
| 1535 | + markedCheckedOutSessions.remove(this); |
1529 | 1536 | }
|
1530 | 1537 | }
|
1531 | 1538 | return ApiFutures.immediateFuture(Empty.getDefaultInstance());
|
@@ -2347,7 +2354,8 @@ private PooledSession pollUninterruptiblyWithTimeout(
|
2347 | 2354 | "Timed out after waiting "
|
2348 | 2355 | + acquireSessionTimeout.toMillis()
|
2349 | 2356 | + "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
|
2350 |
| - + " or increase the number of sessions in the session pool."); |
| 2357 | + + " or increase the number of sessions in the session pool.\n" |
| 2358 | + + createCheckedOutSessionsStackTraces()); |
2351 | 2359 | if (waiter.setException(exception)) {
|
2352 | 2360 | // Only throw the exception if setting it on the waiter was successful. The
|
2353 | 2361 | // waiter.setException(..) method returns false if some other thread in the meantime
|
@@ -2794,6 +2802,9 @@ enum Position {
|
2794 | 2802 | @VisibleForTesting
|
2795 | 2803 | final Set<PooledSessionFuture> checkedOutSessions = new HashSet<>();
|
2796 | 2804 |
|
| 2805 | + @GuardedBy("lock") |
| 2806 | + private final Set<PooledSessionFuture> markedCheckedOutSessions = new HashSet<>(); |
| 2807 | + |
2797 | 2808 | private final SessionConsumer sessionConsumer = new SessionConsumerImpl();
|
2798 | 2809 |
|
2799 | 2810 | private final MultiplexedSessionInitializationConsumer multiplexedSessionInitializationConsumer =
|
@@ -3012,6 +3023,13 @@ int getNumberOfSessionsInUse() {
|
3012 | 3023 | }
|
3013 | 3024 | }
|
3014 | 3025 |
|
| 3026 | + @VisibleForTesting |
| 3027 | + int getMaxSessionsInUse() { |
| 3028 | + synchronized (lock) { |
| 3029 | + return maxSessionsInUse; |
| 3030 | + } |
| 3031 | + } |
| 3032 | + |
3015 | 3033 | @VisibleForTesting
|
3016 | 3034 | double getRatioOfSessionsInUse() {
|
3017 | 3035 | synchronized (lock) {
|
@@ -3266,22 +3284,54 @@ private void incrementNumSessionsInUse(boolean isMultiplexed) {
|
3266 | 3284 |
|
3267 | 3285 | private void maybeCreateSession() {
|
3268 | 3286 | ISpan span = tracer.getCurrentSpan();
|
| 3287 | + boolean throwResourceExhaustedException = false; |
3269 | 3288 | synchronized (lock) {
|
3270 | 3289 | if (numWaiters() >= numSessionsBeingCreated) {
|
3271 | 3290 | if (canCreateSession()) {
|
3272 | 3291 | span.addAnnotation("Creating sessions");
|
3273 | 3292 | createSessions(getAllowedCreateSessions(options.getIncStep()), false);
|
3274 | 3293 | } else if (options.isFailIfPoolExhausted()) {
|
3275 |
| - span.addAnnotation("Pool exhausted. Failing"); |
3276 |
| - // throw specific exception |
3277 |
| - throw newSpannerException( |
3278 |
| - ErrorCode.RESOURCE_EXHAUSTED, |
3279 |
| - "No session available in the pool. Maximum number of sessions in the pool can be" |
3280 |
| - + " overridden by invoking SessionPoolOptions#Builder#setMaxSessions. Client can be made to block" |
3281 |
| - + " rather than fail by setting SessionPoolOptions#Builder#setBlockIfPoolExhausted."); |
| 3294 | + throwResourceExhaustedException = true; |
| 3295 | + } |
| 3296 | + } |
| 3297 | + } |
| 3298 | + if (!throwResourceExhaustedException) { |
| 3299 | + return; |
| 3300 | + } |
| 3301 | + span.addAnnotation("Pool exhausted. Failing"); |
| 3302 | + |
| 3303 | + String message = |
| 3304 | + "No session available in the pool. Maximum number of sessions in the pool can be" |
| 3305 | + + " overridden by invoking SessionPoolOptions#Builder#setMaxSessions. Client can be made to block" |
| 3306 | + + " rather than fail by setting SessionPoolOptions#Builder#setBlockIfPoolExhausted.\n" |
| 3307 | + + createCheckedOutSessionsStackTraces(); |
| 3308 | + throw newSpannerException(ErrorCode.RESOURCE_EXHAUSTED, message); |
| 3309 | + } |
| 3310 | + |
| 3311 | + private StringBuilder createCheckedOutSessionsStackTraces() { |
| 3312 | + List<PooledSessionFuture> currentlyCheckedOutSessions; |
| 3313 | + synchronized (lock) { |
| 3314 | + currentlyCheckedOutSessions = new ArrayList<>(this.markedCheckedOutSessions); |
| 3315 | + } |
| 3316 | + |
| 3317 | + // Create the error message without holding the lock, as we are potentially looping through a |
| 3318 | + // large set, and analyzing a large number of stack traces. |
| 3319 | + StringBuilder stackTraces = |
| 3320 | + new StringBuilder( |
| 3321 | + "There are currently " |
| 3322 | + + currentlyCheckedOutSessions.size() |
| 3323 | + + " sessions checked out:\n\n"); |
| 3324 | + if (options.isTrackStackTraceOfSessionCheckout()) { |
| 3325 | + for (PooledSessionFuture session : currentlyCheckedOutSessions) { |
| 3326 | + if (session.leakedException != null) { |
| 3327 | + StringWriter writer = new StringWriter(); |
| 3328 | + PrintWriter printWriter = new PrintWriter(writer); |
| 3329 | + session.leakedException.printStackTrace(printWriter); |
| 3330 | + stackTraces.append(writer).append("\n\n"); |
3282 | 3331 | }
|
3283 | 3332 | }
|
3284 | 3333 | }
|
| 3334 | + return stackTraces; |
3285 | 3335 | }
|
3286 | 3336 |
|
3287 | 3337 | private void releaseSession(Tuple<PooledSession, Integer> sessionWithPosition) {
|
|
0 commit comments