Skip to content

Commit

Permalink
Fixed a bug in reserve one ride, but still not fixed in reserve path.
Browse files Browse the repository at this point in the history
The bug is actually in grpc and not our code, described here:
grpc/grpc-java#5882

using bidirectional streaming we can not invoke "onCompleted" twice (needed by client & server),
 Note: An exception to this is calling onCompleted from inside onCompleted (this works and this is the temp fix for "one ride reserver", however in our user case this is not relevant as we need to open async requests)

 Our bug:
 Once the server finds a ride and trigger the client's onComplete, the client can not trigger back onComplete for the server, which leads to the semaphore in the server to not to be released, we should think of other way implementing this.

One idea is to not relay on the "onCompleted" and use the actual streaming to sends messages between the client and server, thus the end of the stream can be known when we get a specific message. (via onNext())

If you want to do this, change the grpc method signature, to accept "RequestHeader" (implement it in protobuf), which contains a string message, and data, same for ResponseHeader,
  • Loading branch information
Emilyos committed Jan 19, 2021
1 parent ccb3eec commit 682ec79
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 125 deletions.
13 changes: 12 additions & 1 deletion build-scripts/servers/test_commands.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ curl -X POST -H "Content-Type: application/json" -d '{"first_name": "pam", "last

curl -X POST -H "Content-Type: application/json" -d '{"first_name": "oscar", "last_name": "sh", "path": "C,B", "departure_time": "15/02/2021"}' http://172.18.1.5:8080/reserveRide

curl -X POST -H "Content-Type: application/json" -d '{"first_name": "Roy", "last_name": "sh", "path": "A,C,B", "departure_time": "15/02/2021"}' http://172.18.1.5:8080/reserveRide

curl -X POST -H "Content-Type: application/json" -d '{"first_name": "dwight", "last_name": "sh", "path": "C,B", "departure_time": "15/02/2021"}' http://172.18.1.6:8080/reserveRide
#TODO: If a server got an exception we want it to die and no to be stuck, in order for the system to continue working.

curl "http://172.18.1.6:8080/snapshot"
#TODO: when reserving rides, if we ask the leader directly to reserve a ride that he is leader on then we get a response
#contains who reserved this ride before, however if don't ask the leader then we don't get this info because
#we don't add this info when building the protobuf for the ride. (When asking the leader we don't build a protobuf.
Expand All @@ -55,4 +58,12 @@ curl -X POST -H "Content-Type: application/json" -d '{"first_name": "dwight", "l
# he will be stuck because he initiates a latch with the nunmber of shards, and waits on it, the latch should,
# be initiated on the number of servers that I should contact, in this case only 1 since s2 is the only leader...

#TODO: when asking a leader to give rides,
#TODO: when asking a leader to give rides,

# TEST 1

curl -X POST -H "Content-Type: application/json" -d '{"first_name": "kevin", "last_name": "sh", "path": "A,B", "departure_time": "15/02/2021"}' http://172.18.1.5:8080/reserveRide
curl -X POST -H "Content-Type: application/json" -d '{"first_name": "jim", "last_name": "sh", "path": "C,B", "departure_time": "15/02/2021"}' http://172.18.1.5:8080/reserveRide


curl -X POST -H "Content-Type: application/json" -d '{"first_name": "Emil", "last_name": "Khshiboun", "phone": "0500000000", "start_position": "C", "end_position": "B", "departure_time": "15/02/2021", "vacancies": "4", "pd": "5"}' http://172.18.1.4:8080/publishRide
14 changes: 7 additions & 7 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ plugins {
}

def protobufVersion = "3.13.0"
def grpcVersion = "1.33.0"
def grpcVersion = "1.35.0"
def protocVersion = protobufVersion;

group 'org.ds'
Expand Down Expand Up @@ -51,15 +51,15 @@ dependencies {
// testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"

// testImplementation('org.springframework.boot:spring-boot-starter-test') {
// exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
// }
implementation 'io.grpc:grpc-netty-shaded:1.35.0'
implementation 'io.grpc:grpc-protobuf:1.35.0'
implementation 'io.grpc:grpc-stub:1.35.0'


// examples/advanced need this for JsonFormat
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"

}


Expand Down
39 changes: 13 additions & 26 deletions server/src/generated/main/grpc/generated/sscGrpc.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
package generated;

import static io.grpc.MethodDescriptor.generateFullMethodName;
import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.futureUnaryCall;
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;

/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.33.0)",
value = "by gRPC proto compiler (version 1.35.0)",
comments = "Source: scheme.proto")
public final class sscGrpc {

Expand Down Expand Up @@ -172,42 +159,42 @@ public static abstract class sscImplBase implements io.grpc.BindableService {
*/
public void getRidesAsync(generated.emptyMessage request,
io.grpc.stub.StreamObserver<generated.Response> responseObserver) {
asyncUnimplementedUnaryCall(getGetRidesAsyncMethod(), responseObserver);
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getGetRidesAsyncMethod(), responseObserver);
}

/**
*/
public void addRideLeader(generated.ride request,
io.grpc.stub.StreamObserver<generated.Response> responseObserver) {
asyncUnimplementedUnaryCall(getAddRideLeaderMethod(), responseObserver);
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getAddRideLeaderMethod(), responseObserver);
}

/**
*/
public io.grpc.stub.StreamObserver<generated.reservation> reserveRides(
io.grpc.stub.StreamObserver<generated.ride> responseObserver) {
return asyncUnimplementedStreamingCall(getReserveRidesMethod(), responseObserver);
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getReserveRidesMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getGetRidesAsyncMethod(),
asyncServerStreamingCall(
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
generated.emptyMessage,
generated.Response>(
this, METHODID_GET_RIDES_ASYNC)))
.addMethod(
getAddRideLeaderMethod(),
asyncUnaryCall(
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
generated.ride,
generated.Response>(
this, METHODID_ADD_RIDE_LEADER)))
.addMethod(
getReserveRidesMethod(),
asyncBidiStreamingCall(
io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
new MethodHandlers<
generated.reservation,
generated.ride>(
Expand All @@ -234,23 +221,23 @@ protected sscStub build(
*/
public void getRidesAsync(generated.emptyMessage request,
io.grpc.stub.StreamObserver<generated.Response> responseObserver) {
asyncServerStreamingCall(
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getGetRidesAsyncMethod(), getCallOptions()), request, responseObserver);
}

/**
*/
public void addRideLeader(generated.ride request,
io.grpc.stub.StreamObserver<generated.Response> responseObserver) {
asyncUnaryCall(
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getAddRideLeaderMethod(), getCallOptions()), request, responseObserver);
}

/**
*/
public io.grpc.stub.StreamObserver<generated.reservation> reserveRides(
io.grpc.stub.StreamObserver<generated.ride> responseObserver) {
return asyncBidiStreamingCall(
return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
getChannel().newCall(getReserveRidesMethod(), getCallOptions()), responseObserver);
}
}
Expand All @@ -273,14 +260,14 @@ protected sscBlockingStub build(
*/
public java.util.Iterator<generated.Response> getRidesAsync(
generated.emptyMessage request) {
return blockingServerStreamingCall(
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getGetRidesAsyncMethod(), getCallOptions(), request);
}

/**
*/
public generated.Response addRideLeader(generated.ride request) {
return blockingUnaryCall(
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getAddRideLeaderMethod(), getCallOptions(), request);
}
}
Expand All @@ -303,7 +290,7 @@ protected sscFutureStub build(
*/
public com.google.common.util.concurrent.ListenableFuture<generated.Response> addRideLeader(
generated.ride request) {
return futureUnaryCall(
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getAddRideLeaderMethod(), getCallOptions()), request);
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/entities/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public int hashCode() {
}

public void shutdown() {
System.out.println("Shutting down server ...");
heartbeat_time = -1;
if (grpc_client != null) {
grpc_client.shutdown();
}
Expand Down
41 changes: 28 additions & 13 deletions server/src/main/java/grpc/sscClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import management.MessagesManager;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.logging.Level;


public class sscClient {
private static final Logger logger = Logger.getLogger(sscClient.class.getName());
private static final MessagesManager logger = MessagesManager.instance;
private static final emptyMessage EMPTY_MESSAGE = emptyMessage.newBuilder().build();
private final sscGrpc.sscBlockingStub blockingStub;
private final sscGrpc.sscStub asyncStub;
private final ManagedChannel channel;

StreamObserver<reservation> request_stream;

public sscClient(String address) {
channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
Expand All @@ -41,7 +40,8 @@ public Ride addRideLeader(Ride msg) {
System.out.println("Got Ride: " + msg);
System.out.println("Transformed to ride: " + request);

String new_ride = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS).addRideLeader(request).getMsg(); // @TODO: decide how to use deadline for the retries with different leaders... exponential backoff until a threshold of 1 second for example.
// String new_ride = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS).addRideLeader(request).getMsg(); // @TODO: decide how to use deadline for the retries with different leaders... exponential backoff until a threshold of 1 second for example.
String new_ride = blockingStub.addRideLeader(request).getMsg();
JsonObject obj = JsonParser.parseString(new_ride).getAsJsonObject();
return Ride.deserialize(obj);
}
Expand Down Expand Up @@ -69,36 +69,51 @@ public void onCompleted() {
});
}

public Ride blockingReserveRide(reservation reservation) {
public Ride blockingReserveRide(reservation reserve) {
final Ride[] reserved_ride = new Ride[1];
reserved_ride[0] = Ride.nullRide();
CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<reservation> request_stream = asyncStub.reserveRides(new StreamObserver<ride>() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
request_stream = asyncStub.reserveRides(new StreamObserver<ride>() {
@Override
public void onNext(ride value) {
//TODO: this is temp fix, we should implement proper Request/Response Header in protbuf
if (value.getFirstName().equals("onCompleted")) {
countDownLatch.countDown();
request_stream.onCompleted();
return;
}
logger.log(Level.FINER, "Got onNext from server!");
reserved_ride[0] = new Ride(value);
}

@Override
public void onError(Throwable t) {
countDownLatch.countDown();
logger.log(Level.SEVERE, "Got onError from server! message: " + t.getMessage());
}

@Override
public void onCompleted() {
countDownLatch.countDown();
logger.log(Level.FINER, "Got onCompleted from server!");

}
});

try {
// send reserve request
request_stream.onNext(reservation);
// wait for reservation
logger.log(Level.FINE, "Sending stream request to server to reserve one ride if available..");
request_stream.onNext(reserve);
// request_stream.onCompleted(); // TEMP FIX for one ride reservation.

countDownLatch.await();
return reserved_ride[0];

} catch (RuntimeException e) {
logger.log(Level.WARNING, "Runtime error..." + e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
request_stream.onCompleted();
return reserved_ride[0];
}

Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/grpc/sscService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public void addRideLeader(ride request, StreamObserver<Response> responseObserve

@Override
public StreamObserver<reservation> reserveRides(StreamObserver<ride> responseObserver) {
ReserveRequestHandler newHandler = null;
try {
newHandler = ReserveRequestHandler.getNewHandler(responseObserver);
ReserveRequestHandler newHandler = ReserveRequestHandler.getNewHandler(responseObserver);
return newHandler;
} catch (InterruptedException e) {
//TODO: catch this focking exception by order of the peaky blinders
System.out.println("oops..." + e.getMessage());
}
return newHandler;
return null;
}
}
7 changes: 5 additions & 2 deletions server/src/main/java/management/MessagesManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.google.gson.JsonPrimitive;
import entities.Ride;
import entities.Server;
import org.apache.jute.compiler.JString;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -21,7 +20,7 @@ public class MessagesManager {
public static final String ROOT_LOG_NAME = "ds-hw2";
public static final Level LOG_LEVEL = Level.FINER;
public static final Logger ROOT_LOGGER = Logger.getLogger(ROOT_LOG_NAME);

public static final MessagesManager logger = MessagesManager.instance;
private static final String OPERATION = "operation";
private static final String DATA = "data";

Expand All @@ -33,10 +32,14 @@ public static void ProcessMessage(byte[] message) {
JsonObject json_message = JsonParser.parseString(data_string).getAsJsonObject();
JsonElement operation = json_message.get(OPERATION);
JsonElement data = json_message.get(DATA);
logger.log(Level.FINE, "Processing message...");
if (operation != null && data != null) {
JsonObject ride_as_json = data.getAsJsonObject();
Ride received_ride = Ride.deserialize(ride_as_json);
logger.log(Level.FINE, "Operation " + operation.getAsString());
logger.log(Level.FINE, "Ride " + received_ride.toString());
if (operation.getAsString().equals(OPERATION_NEW_RIDE)) {

ServerManager.getInstance().addRide(received_ride);
} else if (operation.getAsString().equals(OPERATION_UPDATE_RIDE)) {
ServerManager.getInstance().updateRide(received_ride);
Expand Down

0 comments on commit 682ec79

Please sign in to comment.