Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
juliuszsompolski committed Jul 31, 2023
1 parent b84bbc7 commit 8dc3207
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private[client] class CustomSparkConnectBlockingStub(
}
}

def executePlanReattachable(request: ExecutePlanRequest)
: java.util.Iterator[ExecutePlanResponse] = {
def executePlanReattachable(
request: ExecutePlanRequest): java.util.Iterator[ExecutePlanResponse] = {
GrpcExceptionConverter.convert {
GrpcExceptionConverter.convertIterator[ExecutePlanResponse](
// Don't use retryHandler - own retry handling is inside.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ExecutePlanResponseReattachableIterator(
private val initialRequest: proto.ExecutePlanRequest = request
.toBuilder()
.addRequestOptions(
proto.ExecutePlanRequest.RequestOption.newBuilder()
proto.ExecutePlanRequest.RequestOption
.newBuilder()
.setReattachOptions(proto.ReattachOptions.newBuilder().setReattachable(true).build())
.build())
.setOperationId(operationId)
Expand Down Expand Up @@ -122,14 +123,12 @@ class ExecutePlanResponseReattachableIterator(

private def releaseExecute(untilResponseId: Option[String]) = {
val request = createReleaseExecuteRequest(untilResponseId)
rawAsyncStub.releaseExecute(
request,
createRetryingReleaseExecuteResponseObserer(request)
)
rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request))
}

private def createReattachExecuteRequest() = {
val reattach = proto.ReattachExecuteRequest.newBuilder()
val reattach = proto.ReattachExecuteRequest
.newBuilder()
.setSessionId(initialRequest.getSessionId)
.setUserContext(initialRequest.getUserContext)
.setOperationId(initialRequest.getOperationId)
Expand All @@ -145,7 +144,8 @@ class ExecutePlanResponseReattachableIterator(
}

private def createReleaseExecuteRequest(untilResponseId: Option[String]) = {
val release = proto.ReleaseExecuteRequest.newBuilder()
val release = proto.ReleaseExecuteRequest
.newBuilder()
.setSessionId(initialRequest.getSessionId)
.setUserContext(initialRequest.getUserContext)
.setOperationId(initialRequest.getOperationId)
Expand All @@ -156,18 +156,22 @@ class ExecutePlanResponseReattachableIterator(

untilResponseId match {
case None =>
release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_ALL)
release.setReleaseAll(proto.ReleaseExecuteRequest.ReleaseAll.newBuilder().build())
case Some(responseId) =>
release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_UNTIL_RESPONSE)
release.setUntilResponseId(responseId)
release
.setReleaseUntil(
proto.ReleaseExecuteRequest.ReleaseUntil
.newBuilder()
.setResponseId(responseId)
.build())
}

release.build()
}

private def createRetryingReleaseExecuteResponseObserer(
requestForRetry: proto.ReleaseExecuteRequest, currentRetryNum: Int = 0)
: StreamObserver[proto.ReleaseExecuteResponse] = {
requestForRetry: proto.ReleaseExecuteRequest,
currentRetryNum: Int = 0): StreamObserver[proto.ReleaseExecuteResponse] = {
new StreamObserver[proto.ReleaseExecuteResponse] {
override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
override def onCompleted(): Unit = {}
Expand All @@ -176,7 +180,8 @@ class ExecutePlanResponseReattachableIterator(
Thread.sleep(
(retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
.pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
rawAsyncStub.releaseExecute(requestForRetry,
rawAsyncStub.releaseExecute(
requestForRetry,
createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1))
case _ =>
logWarning(s"ReleaseExecute failed with exception: $t.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler
}

private[client] object GrpcRetryHandler extends Logging {

/**
* Retries the given function with exponential backoff according to the client's retryPolicy.
* @param retryPolicy
Expand All @@ -161,8 +162,9 @@ private[client] object GrpcRetryHandler extends Logging {
return fn
} catch {
case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
logWarning(s"Non fatal error during RPC execution: $e, " +
s"retrying (currentRetryNum=$currentRetryNum)")
logWarning(
s"Non fatal error during RPC execution: $e, " +
s"retrying (currentRetryNum=$currentRetryNum)")
Thread.sleep(
(retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
.pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
}

override def reattachExecute(
request: proto.ReattachExecuteRequest,
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
request: proto.ReattachExecuteRequest,
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
// Reply with a dummy response using the same client ID
val requestSessionId = request.getSessionId
val response = ExecutePlanResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,15 @@ object SparkConnectServerUtils {
Seq("--conf", s"spark.sql.catalogImplementation=$catalogImplementation")
}

jarsConfigs ++ writerV2Configs ++ hiveTestConfigs
// Make the server terminate reattachable streams every 1 second and 123 bytes,
// to make the tests exercise reattach.
val reattachExecuteConfigs = Seq(
"--conf",
"spark.connect.execute.reattachable.senderMaxStreamDuration=1s",
"--conf",
"spark.connect.execute.reattachable.senderMaxStreamSize=123")

jarsConfigs ++ writerV2Configs ++ hiveTestConfigs ++ reattachExecuteConfigs
}

def start(): Unit = {
Expand Down
32 changes: 14 additions & 18 deletions connector/connect/common/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -749,28 +749,24 @@ message ReleaseExecuteRequest {
// logging purposes and will not be interpreted by the server.
optional string client_type = 4;

ReleaseType release_type = 5;
// Release and close operation completely.
// Note: This should be called when the server side operation is finished, and ExecutePlan or
// ReattachExecute are finished processing the result stream, or inside onComplete / onError.
// This will not interrupt a running execution, but block until it's finished.
message ReleaseAll {}

enum ReleaseType {
RELEASE_TYPE_UNSPECIFIED = 0;

// Release and close operation completely.
// Note: This should be called when the server side operation is finished, and ExecutePlan or
// ReattachExecute are finished processing the result stream, or inside onComplete / onError.
RELEASE_ALL = 1;

// Release all responses from the operation response stream up to and including
// the response given by until_response_id.
// While server determines by itself how much of a buffer of responses to keep, client providing
// explicit release calls will help reduce resource consumption.
// Noop if response_id not found in cached responses.
RELEASE_UNTIL_RESPONSE = 2;
// Release all responses from the operation response stream up to and including
// the response with the given by response_id.
// While server determines by itself how much of a buffer of responses to keep, client providing
// explicit release calls will help reduce resource consumption.
// Noop if response_id not found in cached responses.
message ReleaseUntil {
string response_id = 1;
}

oneof release {
// if release_type == RELEASE_UNTIL_RESPONSE, the response_id of the response up until and
// including which to release.
string until_response_id = 6;
ReleaseAll release_all = 5;
ReleaseUntil release_until = 6;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ object Connect {
"automatically completed and client needs to send a new ReattachExecute RPC to continue. " +
"Set to 0 for unlimited.")
.version("3.5.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(1) // todo for testing
// .createWithDefault(5 * 60)
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")

val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamSize")
Expand All @@ -94,8 +93,7 @@ object Connect {
"ReattachExecute RPC to continue. Set to 0 for unlimited.")
.version("3.5.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("150") // todo for testing
// .createWithDefaultString("1g")
.createWithDefaultString("1g")

val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.observerRetryBufferSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
} else {
val confSize =
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
if (confSize > 0) System.currentTimeMillis() + 1000 * confSize else Long.MaxValue
if (confSize > 0) System.currentTimeMillis() + confSize else Long.MaxValue
}

// Maximum total size of responses. The response which tips over this threshold will be sent.
Expand Down Expand Up @@ -151,9 +151,10 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
logDebug(s"Reacquired executionObserver lock after waiting.")
}
}
logDebug(s"Exiting loop: detached=$detached, response=$response, " +
s"lastIndex=${executionObserver.getLastResponseIndex()}, " +
s"deadline=${deadlineLimitReached}")
logDebug(
s"Exiting loop: detached=$detached, response=$response, " +
s"lastIndex=${executionObserver.getLastResponseIndex()}, " +
s"deadline=${deadlineLimitReached}")
}

// Process the outcome of the inner loop.
Expand Down Expand Up @@ -195,15 +196,16 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
}

/**
* Send the response to the grpcCallObserver.
* In reattachable execution, we control the flow, and only pass the response to the
* grpcCallObserver when it's ready to send.
* Otherwise, grpcCallObserver.onNext() would return in a non-blocking way, but could queue
* responses without sending them if the client doesn't keep up receiving them.
* When pushing more responses to onNext(), there is no insight how far behind the service is
* in actually sending them out.
* @param deadlineTimeMillis when reattachable, wait for ready stream until this deadline.
* @return true if the response was sent, false otherwise (meaning deadline passed)
* Send the response to the grpcCallObserver. In reattachable execution, we control the flow,
* and only pass the response to the grpcCallObserver when it's ready to send. Otherwise,
* grpcCallObserver.onNext() would return in a non-blocking way, but could queue responses
* without sending them if the client doesn't keep up receiving them. When pushing more
* responses to onNext(), there is no insight how far behind the service is in actually sending
* them out.
* @param deadlineTimeMillis
* when reattachable, wait for ready stream until this deadline.
* @return
* true if the response was sent, false otherwise (meaning deadline passed)
*/
private def sendResponse(response: T, deadlineTimeMillis: Long): Boolean = {
if (!executeHolder.reattachable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,13 @@ class SparkConnectReleaseExecuteHandler(
messageParameters = Map.empty)
}

v.getReleaseType match {
case proto.ReleaseExecuteRequest.ReleaseType.RELEASE_ALL =>
executeHolder.close()
case proto.ReleaseExecuteRequest.ReleaseType.RELEASE_UNTIL_RESPONSE =>
if (!v.hasUntilResponseId) {
throw new IllegalArgumentException(
s"RELEASE_UNTIL_RESPONSE requested, but no until_response_id provided.")
}
executeHolder.releaseUntilResponseId(v.getUntilResponseId)
case other =>
throw new UnsupportedOperationException(s"Unknown ReleaseType $other!")
if (v.hasReleaseAll) {
executeHolder.close()
} else if (v.hasReleaseUntil) {
val responseId = v.getReleaseUntil.getResponseId
executeHolder.releaseUntilResponseId(responseId)
} else {
throw new UnsupportedOperationException(s"Unknown ReleaseExecute type!")
}

val response = proto.ReleaseExecuteResponse
Expand Down
18 changes: 10 additions & 8 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

0 comments on commit 8dc3207

Please sign in to comment.