-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
支持PostgreSQL协议的讨论帖 #1546
Comments
在理解ComplexTask上遇到了一些问题,于是请教了workflow的开发同学,在workflow小伙伴的支持下,基本理顺了复合任务的工作机制和流程。 ComplexTask的执行路径
ComplexTask中的seqid每个ComplexTask里都有一个seqid成员变量,这个是由框架底层去维护的,每进行一次任务(调用一次message_out()函数),seqid都会+1,可以使用seqid来辅助判断当前进行到哪一步了。 |
差不多是这个流程。seqid就是这次请求是这个长连接上的第几次交互的意思,对于redis来讲,如果是第0次,则需要发送AUTH请求或SELECT请求(无需AUTH时),也可能是实际用户请求(既没有AUTH也没有SELECT);如果是第1次,有可能是SELECT请求或用户请求。 finish_once返回false表示此次交互不是用户请求,所以整个task会再dispatch一次,以执行实际用户请求。 但注意,第二次dispatch依然可能拿到一个新连接(seqid==0),原来AUTH过的连接是有可能被另外一个相同用户名密码的任务抢走的,这时候又会重新AUTH。 当然,第二次dispatch也可能拿到另外一个任务释放的连接,总之不一定就是刚才个连接。这也是为什么MySQL里,认证过程中server下发的seed是保存在连接上,而不是保存在任务里。如果你能理解到这一步,基本上就算很熟悉我们的模式了。 |
我看到seqid是保存在CommSession,而WFComplexClientTask是最终继承自CommSession,那会不会出现seqid>0,但实际上这个连接还没有认证过。还是说框架会去自动更新seqid |
seqid没有保存在session上啊,每次从连接上拿的。代码在这一行: workflow/src/kernel/Communicator.cc Line 1743 in 842fbdf
entry是一个CommConnEntry结构,代表连接入口。session每次发起都会从那取一个seq下来。 总之一个WFComplexClientTask的多次dispatch(finish_once返回false),拿到的seqid并不是递增的。以前我们的一个版本,如果访问一个域名,域名下多个IP,一次MySQL请求就会把所有的IP都AUTH一遍。不过这个问题已经修复了。 |
明白了 感谢 |
在ComplexClientTask中,相同的info信息会共用连接,Mysql启用事务的方式是通过给info指定一个唯一的ID来保证这个连接不会被其他的任务共享,这个连接的生命周期是交给用户来控制(用户来控制是否disconnect)。 对于PostgreSQL协议的实现来说,想要简化这个过程 PG有下面两种协议:
对于扩展查询协议,会和数据库有多次交互,直到最后发送sync,才表示一个扩展查询结束,所有的交互都必须在同一个连接里执行。所以对于扩展查询协议,在复合任务执行期间默认就应该独占连接(无论用户是否指定开启事务)。 同时可以让用户在创建任务时,显式的指定开启事务(默认不开启),如果开启了事务,就在复合任务执行期间独占一个连接。 如果说每次创建复合任务时,都指定一个唯一的info来获取连接,任务结束后释放连接,会浪费系统资源、增加延迟(每次都要重新建立连接,重新认证),能否通过动态修改info的方式来做到复用连接?
|
先说一下,目前你可以先不用关注复合任务交互这块,可以先搞定协议解析,再考虑怎么交互。 我们ComplexClientTask的info部分,是为了用来区别通讯目标的,同一个通讯目标下,所有连接都认为是等价的,想发起请求,可以选取通讯目标下任何一个IP的任何一个连接使用。我们在获得通讯目标时,可以要求固定地址+固定连接,这块在最新的MySQL代码里,是这个地方(最近有一些小修改): 对于PG里的扩展查询协议,我的理解就是现在WFMySQLConnection解决的问题,这个类通过指定一个id来选取一个固定连接,这个连接上的任务都通过这个二级工厂来产生。哪怕你把工厂(就是这个WFMySQLConnection)delete了,只要连接没有关闭,之后还是可以通过相同的id找回这个连接。你的扩展查询到时候搞一个一样的就可以了。 看起来并没有你想像的那么复杂,建议先关注一下协议的实现。 |
理解您的意思,如果用户如果想要在Mysql并发的执行事务,就需要开多个Connection,然后由二级工厂来创建任务,id分配和维护由用户来做。 我先把精力放在协议的实现,这个可以后面再讨论。 |
用户自己来指定也可以啊,就是把URL写成: mysql://user:pass@localhost/dbname?transation=xxx 我们的任务都是并发的,不太可能完全不需要用户指定,就知道发到哪个连接吧。 |
明白了,谢谢。 if (!transaction.empty())
{
this->WFComplexClientTask::set_info(std::string("?maxconn=1&") +
info + "|txn:" + transaction);
this->set_fixed_addr(true);
} |
想请教下复合任务中的keep_alive_timeout这个成员方法应该怎么用 我看到分别在Communicator.cc中的下面三个方法里调用了keep_alive_timeout:
我的理解是他可以作为一个钩子,通过在复合任务中重写这个方法,在一些环节触发用来处理相关的逻辑,如果返回0还可以把连接给关上,请问我在重写这个方法的时候,应该把哪些逻辑放在这里去实现呢? |
前两个都是server任务的,你只需关注第三个调用位置。 client收完回复,调用handle之前,需要通过这个函数返回连接可以保持的时间。这个参考各个协议的实现就可以了。 |
好的,感谢 |
@oisin9 我建议你先从Request和Response出发,把实际的用户请求消息先做出来。也不用考虑startup消息的实现。有了Request和Response,你就可以按我们简单自定义协议的server/client的方法,做出一组可以通讯的server/client来调试消息了。之后再做startup消息以及任务实现。 |
感谢您的建议,我目前的思路是想把ComplexTask实现,然后用真实的PG数据库一步步调试,从startup到后面的协议,按顺序去实现。 |
这么做的好处是更好的理解server/client一体,方便将来实现WFPostgresServer,可以用来做Postgres的代理服务器或与Postgres兼容的数据库服务。 |
@oisin9 麻烦合并一下master最新代码。你之前fork的那个版本,正好在创建自定义协议任务时有点问题。 |
PostgreSQL数据库是一个流行的开源关系型数据库,计划为workflow框架增加对PostgreSQL协议的支持。后续的进展和讨论将会放到这个Issue中进行。
开发workflow的PG协议的Fork仓库地址:
https://github.com/oisin9/workflow/tree/feature-pg
The text was updated successfully, but these errors were encountered: