Skip to content

Commit

Permalink
update streaming simulation demo
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronwuus committed Feb 27, 2021
1 parent 0689c67 commit 9bc10cf
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 2 deletions.
61 changes: 60 additions & 1 deletion StreamServer/Hubs/StreamingHub.cs
@@ -1,13 +1,20 @@
using Grpc.Net.Client;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.AspNetCore.SignalR;
using Streamer;
using StreamServer.Models;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace StreamServer.Hubs
{
Expand Down Expand Up @@ -37,5 +44,57 @@ public async Task SendImage(int index, int x, int y, int w, int h, bool stop)
}
}
}
static Subject<Streamer> StreamerSubject = new Subject<Streamer>();

private static readonly Dictionary<int, bool> RoomHasSubscriber = new Dictionary<int, bool>();
public async Task Subscribe(int index, int x, int y, int w, int h, bool stop)
{
var subscription = StreamerSubject.Subscribe(
async x =>
{
if(x.Index == index)
{
var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
var imgData = "data:image/gif;base64," + base64Data;
await Clients.Caller.SendAsync("ReceiveImage", imgData);
}
},
() => Console.WriteLine("Sequence Completed."));
if (!RoomHasSubscriber.ContainsKey(index))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler }))
{
var request = new StreamRequest();
request.X = x;
request.Y = y;
request.W = w;
request.H = h;
var client = new Greeter.GreeterClient(channel);
CancellationTokenSource cts = new CancellationTokenSource();
var sb = new StringBuilder();
var streamServer = client.StreamingServer(request, cancellationToken: cts.Token);
RoomHasSubscriber[index] = true;
while (await streamServer.ResponseStream.MoveNext(cts.Token))
{
StreamerSubject.OnNext(new Streamer { Index = index, Image = streamServer.ResponseStream.Current.Image });
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
{
await Task.Delay(1000);
}
await Task.Delay(-1);
}
}
public class Streamer
{
public int Index { get; set; }
public ByteString Image { get; set; }
}
}
4 changes: 4 additions & 0 deletions StreamServer/StreamServer.csproj
Expand Up @@ -11,6 +11,10 @@
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.35.0" />
<PackageReference Include="System.Drawing.Common" Version="4.7.2" />
<PackageReference Include="System.Reactive" Version="4.4.1" />
<PackageReference Include="System.Reactive.Core" Version="4.4.1" />
<PackageReference Include="System.Reactive.Linq" Version="4.4.1" />
<PackageReference Include="System.Reactive.PlatformServices" Version="4.4.1" />
</ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\greet.proto" GrpcServices="Client" />
Expand Down
3 changes: 3 additions & 0 deletions StreamServer/Views/Home/Index.cshtml
Expand Up @@ -10,6 +10,9 @@
</div>
</div>
<div class="col-6">
<div class="row">
<label for="index">room:</label> <input type="text" id="index" placeholder="room" />
</div>
<div class="row">
<div class="col-3">
<label for="x">X:</label> <input type="text" id="x" placeholder="X" />
Expand Down
3 changes: 2 additions & 1 deletion StreamServer/wwwroot/js/site.js
Expand Up @@ -12,11 +12,12 @@ connection.start().then(function () {
});

document.getElementById("start").addEventListener("click", function (event) {
var i = document.getElementById("index").value;
var x = document.getElementById("x").value;
var y = document.getElementById("y").value;
var w = document.getElementById("w").value;
var h = document.getElementById("h").value;
connection.invoke("SendImage", 0, parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) {
connection.invoke("Subscribe", parseInt(i), parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) {
return console.error(err.toString());
});
event.preventDefault();
Expand Down

0 comments on commit 9bc10cf

Please sign in to comment.