Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hprose 推送功能的改进 #8

Open
andot opened this issue May 9, 2018 · 15 comments
Open

Hprose 推送功能的改进 #8

andot opened this issue May 9, 2018 · 15 comments

Comments

@andot
Copy link
Member

andot commented May 9, 2018

Hprose 2.0 中增加了服务器推送的功能。但是该功能目前仍然有一些问题需要解决:

  1. 客户端超时必须要大于服务器端超时,否则,客户端可能会收不到或丢失部分服务器端推送的数据。

  2. 当客户端短时间内关闭并重启,且使用同一个客户端 id 来订阅推送主题时,客户端会收不到或丢失部分服务器端的推送数据。

  3. 当服务器对客户端进行高频数据推送时,客户端如果来不及响应,客户端会丢失部分服务器端的推送数据。

4、无法推送 null,无法推送异常。

5、无法快速退订主题。

要解决这些问题,就需要对现有实现做出修改,下面是如何修改的一些想法。

@andot
Copy link
Member Author

andot commented May 9, 2018

现有实现

在讨论修改之前,先说一下现有实现是如何设计的。

首先,服务器端通过 publish 方法(首字母大小写根据具体实现语言而定)来发布一个自动实现的推送方法。该方法主要有三个参数:topictimeoutheartbeat,不同语言在实现时,在参数的排列上和写法上可能有些区别。

topic 参数为推送的主题名,它将作为自动实现的推送方法的发布名称。

timeout 参数用来设置推送空闲超时。该属性默认值为 120000,单位是毫秒(ms),即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回响应:

Rnz

此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

heartbeat 参数用来设置推送的心跳检测间隔时间。该属性默认值为 3000,单位是毫秒(ms),即 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 heartbeat 时间内没有取走推送数据,则服务器端认为客户端已掉线。对于已掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

timeoutheartbeat 属性在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 heartbeat 时间可以检测到客户端以掉线。

timeoutheartbeat 设置的时间越短,检测到客户端离线的时间就越短。

但是需要注意的是,timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 timeout 设置的过短,否则会严重增加服务器的负担。

因此,timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 heartbeat 的时间设置的过长。

如果 heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

客户端在订阅请求时,会发起对推送主题的调用:

C<topic>a1{<id>}z

<topic> 是 Hprose 序列化的主题名,比如如果主题名为 "news",则 <topic> 为:

s4"news"

<id> 是 Hprose 序列化的表示客户端的唯一 id。比如如果 id 为 UUID 格式的数据 "AFA7F4B1-A64D-46FA-886F-EF6432AD69A3",则 <id> 为:

g{AFA7F4B1-A64D-46FA-886F-EF6432AD69A3} 

服务器通过客户端发起请求的这个连接来返回推送数据,例如:

R<data>z

为服务器的返回的 Hprose 序列化数据。例如如果推送的数据为字符串“hello”,则返回数据为:

Rs5"hello"z

当服务器端没有数据推送时,客户端发起请求的连接会一直保持跟服务器相连,直到客户端超时或者服务器端超时。如果客户端先超时,客户端会断开该连接,并通过新连接发起请求。如果服务器先超时,服务器会返回:

Rnz

客户端如果收到推送数据,则对订阅的回调方法进行回调,同时再次发起对订阅主题的请求。

如果客户端收到的是超时返回的 Rnz,则不对订阅的回调方法进行回调,直接再次发起对订阅主题的请求。

如果服务器返回异常,或者客户端和服务器之间发生网络异常,例如网络中断等,客户端会忽略异常,并再次发起对订阅的请求。

@andot
Copy link
Member Author

andot commented May 9, 2018

问题分析

在考虑解决方案之前,先来分析一下问题产生的原因。

前两个问题实际上是同一个问题,即当同一个客户端发起对同一个主题的二次请求时,服务器端没有一个好的处理方法的问题。

因为这时不管是直接返回 null,还是抛出异常,按照目前的设计,如果前一个客户端并没有断开连接,那么前一个客户端收到这个响应之后,就会重新发起请求,而这个重新发起的请求,就会迫使刚才发起请求的客户端再次返回 null 或者抛出异常,这会引起不断交替请求的问题。目前的解决方法时,客户端超时后,客户端断开连接,然后再发起新的请求,这样当服务器端返回对前一个请求的响应时,前一个客户端就无法收到了,也就不会出现交替请求的问题了。但是如果不是这种情况下,发起的二次请求,例如用户编程时重复订阅导致的二次请求,就无法避免这种交替请求的问题了。

第三个问题来自于当服务器端高频推送时,客户端每次只能返回一个推送结果,因此会导致服务器推送数据的堆积,为了防止心跳检测的时间设置过长,导致服务器推送数据堆积,只能将心跳检测时间缩短,而缩短导致的结果就是容易误判客户端掉线,一旦误判客户端掉线,服务器堆积的消息就会被清空,当客户端再次连上来时,清空的数据就无法被返回了。

第四个问题来自于 Rnz 作为了一个特殊值对待,并且异常没有区分服务器返回的异常还是网络中断导致的异常。

第五个问题来自于退订主题的实现,目前的实现方式仅仅是删除客户端所保存的关于订阅主题的处理方法。而服务器端要等待服务器超时和心跳超时之后,才能判断客户端是否已经退订了主题,也就是说当服务器端没有向客户端推送任何消息时,服务器端需要至少 timeout + heartbeat 的时间才能检测到客户端以离线,这在某种程度上来说是一个相当长的时间。

@andot
Copy link
Member Author

andot commented May 9, 2018

解决方案

要解决上面这些问题,需要对客户端和服务器都做修改。下面的解决方案可以解决上面所说的几个问题,但是会与之前的实现不兼容,因此,如果使用新版本的推送功能,需要客户端和服务器都进行升级才行。

首先,客户端请求格式不变。

服务器如果有数据返回,返回格式为:

Ra<n>{<data-1><data-2>...<data-n>}z

<n> 为推送的数据的个数,<data-1> 为推送的第 1 个数据对象,<data-2> 为推送的第 2 个数据对象,<data-n> 为推送的第 n 个数据对象。

当仅有一个数据被推送时,返回值为:

Ra1{<data>}z

这样如果服务器推送 null,返回是:

Ra1{n}z

客户端不需要特殊处理,也可以正常接收服务器端推送的 null 了。

当服务器返回第一个推送结果,而客户端新的请求还没有到达服务器端时,如果服务器端积压了多个推送结果,当客户端新的请求到达时,服务器端可以一次返回所有积压的推送数据。这样一方面减少了不必要的请求响应,另一方面减少了服务器端积压的推送数据,心跳检测时间就可以设置的稍长一点,这样就可以避免掉线误判导致推送数据丢失的问题了。

当服务器端没有数据,而服务器端超时时间已到时,服务器端返回:

Ra{}z

或者:

Rez

客户端收到这个空的数组结果后,重新提交请求。也就是用这个结果代替原来的 Rnz

当客户端超时,服务器端未超时,或者客户端关闭后又打开,又或者什么其它原因客户端在第一个订阅请求还未返回时,客户端发送了第二个订阅请求,服务器端对前一个请求返回:

Rnz

客户端如果能够收到该响应,则不再重新发起订阅请求。如果原来发送订阅请求的客户端早已关闭,客户端收不到这个响应,自然也不会重新发起订阅请求。

这样就可以解决前面说的前两个问题了。

如果服务器端推送了一个异常,服务器按照通常的方式将异常返回给客户端,客户端需要把这个异常传递给订阅的回调函数,用户可以自己根据情况来确定是否退订,不应该像之前版本那样忽略掉。而如果不是服务器返回产生的异常,而是因为网络中断产生的异常,则启动自动重试机制。

退订时,客户端除了完成之前所做的工作以外,还应该向服务器发送一个退订请求,请求格式如下:

C<topic>a2{<id>f}z

即在之前表示订阅的调用中增加一个 false 参数,用来表示退订。服务器收到退订请求后,将该客户端设置为已退订主题状态,并返回:

Rnz

而按照前面的约定,客户端收到此响应后,不会再发起订阅请求。订阅就此结束。

通过这种方式,就可以服务器的快速退订检测了。

优点:

以上问题全部可以解决。

缺点:

跟旧的实现不兼容,客户端和服务器端需要同步升级。

@andot
Copy link
Member Author

andot commented May 10, 2018

另外,如果客户端和服务器端传递共享数据的功能先实现的话,推送可以借助这个功能来简化使用。方法如下:

客户端 id 不再通过参数方式传递,而改为通过共享数据传递,客户端 id 设置为客户端的全局共享数据,这样不仅仅对推送的方法带有 id,每个普通调用上也都带有 id,因此,在服务器的可以实现为自动获取该 id,并表示为服务器端的 context.CurrentClient 对象,之后,直接使用该对象来实现对当前调用方法的客户端进行推送。

这样修改之后,客户端的订阅请求是这样的:

Hm1{s2"id"g{AFA7F4B1-A64D-46FA-886F-EF6432AD69A3}}Cs4"news"a1{t}z

退订请求是这样的:

Hm1{s2"id"g{AFA7F4B1-A64D-46FA-886F-EF6432AD69A3}}Cs4"news"a1{f}z

@angwangiot
Copy link

建议在服务器端的推送上加个功能,服务器端可以主动拒绝客户端的订阅。
比如我这边想用客户端订阅的id来判断是否有权限订阅,如果服务器不能主动拒绝,这样的话不管是谁都可以订阅到了,即使业务逻辑上不去发布这个id的消息,也会在服务器端多占用一个tcp长连接。

@andot
Copy link
Member Author

andot commented Jun 13, 2018

嗯,我发现如果按照上面的协议实现的话,服务器端主动拒绝客户端的订阅是可以实现的,只要服务器端实现一个方法,然后返回给客户端

Rnz

就可以实现主动退订。

@alonexy
Copy link

alonexy commented Dec 4, 2018

建议在在客户端增加订阅事件 开启或关闭自动重连
并且增加服务器断开的事件回调方法

@andot
Copy link
Member Author

andot commented Jan 10, 2019

在 hprose 2.0 的推送方案和上面的改进设计方案中,每订阅一个主题,就需要发起一个请求,如果底层连接是 HTTP 或 TCP 半双工的情况下,每个订阅就会独占一个连接,如果一个客户端订阅了许多个推送主题,将会占用多个连接,而浏览器对并发连接数是有限制的,因此如果能够所有订阅同时只使用一个或两个连接(一个或两个并发请求)的话,那么这个问题就可以很好的解决了。为此,上面的改进方案最好做进一步修改。

@andot
Copy link
Member Author

andot commented Jan 10, 2019

客户端必须首先生成一个唯一 id,该 id 可以在客户端生成,例如通过生成 GUID 的方式来表示唯一 id。也可以通过某个服务器来获取,这样可以在保证唯一性的同时,保证该 id 足够短,例如在服务器端通过原子计数器来作为 id 发生器,因为所有的客户端 id 都从该服务器获取,所以每个客户端获取到的计数都是唯一的。

之后,客户端在发起请求时,通过 hprose 3.0 的 requestHeaders 把该 id 发送给服务器端,服务器端通过该 id 来标识和管理客户端。

因为 requestHeaders 部分的数据是相同的,所以下面在讨论推送时,不再写 requestHeaders 部分的数据内容。

订阅请求从每个主题一个方法改为统一方法,该方法用符号 @ 表示。例如:

Cu@a2{s6"topic1"t}z

表示订阅主题 topic1

当再有新的订阅时,比如topic2,那么提交新的请求:

Cu@a2{s6"topic2"t}z

服务器收到该请求之后,前一个请求返回:

Rnz

客户端收到该响应时,不再发起新的请求。

当服务器超时后,返回:

Rm{}z

或者

Rez

表示没有推送数据,此时客户端重新发起请求:

Cu@z

因为已经订阅的主题在服务器端有记录,所以后面重新发起请求时,不需要发送订阅的主题参数。

当订阅的主题有数据推送时,服务器端返回:

Rm2{s6"topic1"a<n>{<data-1><data-2>...<data-n>}s6"topic2"a<n>{<data-1><data-2>...<data-n>}}z

也就是返回一个 map 对象,该对象的键是订阅的主题名称,值为推送的数据的集合。

<n> 为每个主题推送的数据的个数,<data-1> 为推送的第 1 个数据对象,<data-2> 为推送的第 2 个数据对象,<data-n> 为推送的第 n 个数据对象。

只有包含推送数据的主题才包含键值对,没有推送数据的主题,将不返回键值对。

例如只有 topic2 推送了一个数据 null 时,返回的数据为:

Rm1{s6"topic2"a1{n}}z

客户端不需要特殊处理,也可以正常接收服务器端推送的 null

当服务器返回第一个推送结果,而客户端新的请求还没有到达服务器端时,服务器端可以把新的推送数据放入推送队列中存储。当客户端新的请求到达时,服务器端可以一次返回推送队列中积压的所有推送数据。这样一方面减少了不必要的请求响应,另一方面减少了服务器端积压的推送数据,心跳检测时间就可以设置的稍长一点,这样还可以避免掉线误判导致推送数据丢失的问题。

当客户端超时,服务器端未超时,或者客户端关闭后又打开,又或者什么其它原因客户端在第一个订阅请求还未返回时,客户端发送了第二个订阅请求,服务器端对前一个请求也同样返回:

Rnz

客户端如果能够收到该响应,则不再重新发起订阅请求。如果原来发送订阅请求的客户端早已关闭,客户端收不到这个响应,自然也不会重新发起订阅请求。

客户端退订某个主题时,发送:

Cu@a2{<topic>f}z

<topic> 表示退订的主题名,服务器端直接对该请求返回:

Rnz

当服务器端发现该客户端订阅的所有主题都已经取消订阅后,会对之前的订阅请求返回:

Rnz

因为客户端收到该结果时,表示不再发送请求,因此客户端的所有订阅就此结束。

当服务器的要主动拒绝客户端订阅某个主题时,可以返回:

Rm1{s6"topic2"n}z

客户端收到该消息后,可以触发 topic2 的退订事件。客户端会继续发起请求:

Cu@z

当服务器的在收到该请求之后,发现该客户端在服务器端还有其它有效的订阅,会继续按照前面的步骤进行。如果发现该客户端已经没有有效订阅的主题的话,则返回:

Rnz

客户端不再发起新的请求,订阅结束。

@andot
Copy link
Member Author

andot commented Jan 12, 2019

为了清晰区分出用于接受推送消息的长链接调用和用于发送订阅和退订指令的短链接调用,我打算把上面方案中统一的一个方法 @ 改为三个方法,其中 @ 方法仅用于接受推送消息的长链接调用,该方法没有参数。另外两个方法分别用 +- 表示。从名字上就可以看出,+ 表示订阅,- 表示退订。它们都是只有一个参数,就是主题名,这两个方法的返回值为 boolean 类型。当返回值为 true 时,表示订阅或退订成功,同时服务器会触发订阅或退订事件,当返回值为 false 时,表示重复订阅或重复退订,服务器状态不会改变,也不会触发服务器端的订阅或退订事件。

@andot
Copy link
Member Author

andot commented Jan 12, 2019

在具体实现时,客户端提供的 API 接口可以用 subscribe 表示调用 + 方法,用 unsubscribe 表示调用 - 方法。回调方法在调用以上方法时设置。

@andot
Copy link
Member Author

andot commented Jan 15, 2019

最终实现改为一个长连接调用用于获取推送消息,7个短链接调用用于发送指令和推送消息。下面是方法名列表:

< 获取推送消息。(长连接)

+ 订阅主题。
- 取消订阅。
> 推送消息到指定的某个客户端。
>? 推送消息到指定的多个客户端。
>* 推送消息到全体客户端。
? 查询某个客户端是否订阅了某个主题。
| 返回所有订阅过某主题的客户端的唯一id列表。

推送的消息体为:

{
    from: string;
    data: any;
}

这样的结构。例如:

{
    from: '1',
    data: 'hello world'
}

from 表示推送信息的来源,它的值是推送该消息的客户端的唯一id值,如果是服务器推送,则该值为空字符串。data 是推送的实际数据,它可以是任意可序列化类型。

序列化后为:

m2{s4"from"u1s4"data"s11"hello world"}

这是指单一的数据,推送的数据是以数组返回,每个消息是数组中的一个元素。下面是实际实现后的一个推送过程中通讯的消息:

客户端1发送订阅请求
Hm1{s2"id"u1}Cu+a1{s4"test"}z

服务器返回订阅成功给客户端1
Rtz

客户端1开始监听推送消息
Hm1{s2"id"u1}Cu<z

客户端1订阅新的主题 test2
Hm1{s2"id"u1}Cu+a1{s5"test2"}z

服务器返回订阅成功给客户端1
Rtz

客户端1发起新的获取消息的请求
Hm1{s2"id"u1}Cu<z

前一个监听推送的请求被取消(服务器返回给客户端1的响应)
Rnz

客户端2发送推送‘hello'消息到客户端1的test主题
Hm1{s2"id"u2}Cu>a3{s5"hello"s4"test"u1}z

服务器返回表示成功响应给客户端2
Rtz

服务器推送的消息到客户端1,其中包含了推送的数据 'hello' 和数据来源 '2'
Rm1{s4"test"a1{m2{s4"from"u2s4"data"s5"hello"}}}z

客户端2继续推送同一条信息到客户端1的test主题
Hm1{s2"id"u2}Cu>a3{s5"hello"s4"test"u1}z

服务器返回表示成功响应给客户端2
Rtz

客户端2继续推送’world'到客户端1的test2主题
Hm1{s2"id"u2}Cu>a3{s5"world"s5"test2"u1}z

服务器返回表示成功响应给客户端2
Rtz

客户端2继续推送’world'到客户端1的test2主题
Hm1{s2"id"u2}Cu>a3{s5"world"s5"test2"u1}z

服务器返回表示成功响应给客户端2
Rtz

客户端1发起新的获取消息的请求
Hm1{s2"id"u1}Cu<z

服务器批量返回客户端2推送给客户端1的数据
Rm2{s4"test"a1{m2{s4"from"u2s4"data"s5"hello"}}s5"test2"a2{m2{r4;u2r5;s5"world"}m2{r4;u2r5;r10;}}}z

客户端1发起新的获取消息的请求
Hm1{s2"id"u1}Cu<z

客户端1发起退订test主题的请求
Hm1{s2"id"u1}Cu-a1{s4"test"}z

服务器返回表示退订成功的响应给客户端1
Rtz

在上面的例子中,

Hm1{s2"id"u1}Cu<z

是一个长连接请求,请求发出后,服务器不会立即返回,直到该请求被取消,或有推送消息,或者服务器超时才会返回。

其它指令性请求,都会立即返回结果。

另外,该实现是在 RPC 层实现的,并不涉及到具体的编解码协议,上面的例子中虽然通讯内容是 hprose RPC 协议格式,但是 hprose 3.0 的编码层被设计为可替换的,因此通讯部分的数据格式,可以完全是 jsonrpc 协议格式的,或者任何自定义协议格式的。但是推送本身的实现逻辑不变。

目前该设计已经在 TypeScript 版本中被实现。具体代码可以参加:

服务器:https://github.com/hprose/hprose-ts/blob/master/src/rpc/Broker.ts
客户端:https://github.com/hprose/hprose-ts/blob/master/src/rpc/Prosumer.ts

@Kenchizhuo
Copy link

该推送优化方案有人在Java版本实现了吗?

@andot
Copy link
Member Author

andot commented Jan 28, 2019

@Kenchizhuo 目前还没有。到目前为止,已经实现的版本只有 TypeScript 版本,到春节的时候,.NET 版本应该也能完成实现。到正月十五,Dart 版本应该能够完成。至于 Java 版本以后看看什么时候来做吧,Java 版本以后可能会考虑用 Java 11 来写,或者用 Kotlin 来写,底层通讯可能也会从原生 JDK 的 NIO 换成 Netty。所以,Java 版本的 3.0 版本相对于目前的版本改动应该会很大,所以开发周期应该不会很短。

@andot
Copy link
Member Author

andot commented Feb 5, 2019

在最新的实现中,为了减小批量推送时返回响应的大小,将返回的消息体:{ from: string; data: any; } 定义成了一个类,注册的类名为一个特殊字符:@,字段名仍为上面两个字段名。例如:

Rm2{s4"test"a1{c1"@"2{s4"data"s4"from"}o0{s5"hello"u2}}s5"test2"a2{o0{s5"world"u2}r9;}}z

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants