即使一个人,也要活得像军队一样!

Odoo13-- 尝试Odoo与websocket的结合

灵感

最近,看到Odoo13企业版有一个im模块,实现聊天的功能。于是突发奇想,能不能结合websocket,实现一个在线聊天室的功能。类似于OA。

先说一下odoo的即时通讯。

odoo的模块

核心技术

通过数据库消息队列

先说最重要的,Odoo 即时通讯使用了 PostgreSQL 数据库的 listennotify 的机制完成。这个机制是 PostgreSQL 数据库私有的,其它数据库未必支持。所以要用 Odoo 是必须要用 PostgreSQL,这是原因之一。参考这里可以了解更多关于 PostgreSQL listen notify 的信息。

使用 listennotify可以让连接数据库的各个客户端之间进行实时通讯。

通过长连接

连接数据库的客户端不是 Odoo 的客户端,数据库的客户端实际上是 Odoo 的服务端,是 Python 代码连接 数据库;而 Odoo 客户端是通过 Javascript 实现的 Web 应用,它通过长连接方式与 Odoo 后台保持信息的实时性。长连接的链接地址是 longpolling/poll ,Odoo 客户端会发起这个连接请求,Odoo 服务端处理这个请求,如果有这个请求关注的 Channels 的消息,那么这个请求就会立即返回,如果没有消息,这个连接会保持 TIMEOUT 秒,目前 TIMEOUT 是50秒。Channels 就是会话标记,可以理解为一个聊天室、一个群等等,客户 poll 数据的时候要写上它关注的 Channels。

异步处理

如果很多用户同时使用 Odoo,那么 Odoo 为每个客户保持一个连接,这是无疑问的,因为没有连接就没有办法读写数据;但是每个连接是在一个线程里面呢,还是多个呢?简单说,Odoo 只为 longpolling 维护了一个线程或者一个进程(gevent),如果你启动 Odoo 的时候使用了 worker 参数,就意味这 Odoo 要以多进程方式运作,如果没有指定 woker 就是多线程方式,如果你启动的是线程模式,longpolling 将是一个线程,如果你启动的是 worker (进程)模式那么 Odoo 会通过 Popen 一个全新进程,这个全新进程的命令行 加上 gevent,很怪异吧,确实就是这么干的。

1
2
3
4
5
def long_polling_spawn(self):
nargs = stripped_sys_argv()
cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
popen = subprocess.Popen(cmd)
self.long_polling_pid = popen.pid

把原来的命令行插入一个 gevent,再启动一遍。当然后续的代码会判断如果是以 gevent 启动命令的,这是要启动 longpolling。

gevent 在 Python 3 asyncio 的大环境下是个过时的技术了,它使用了 Monkey Patch 的方式对 Python 库进行了异步化,感觉代码的书写方式还是一样,但是已经异步化了。好处是代码在没有 gevent 的时候可以同步跑,引入 gevent 后不用改变代码逻辑就可以异步化。有人会问,异步化有啥好处啊?异步化可以让 Odoo 同时处理多个连接,就这么简单,如果没有异步化,一个连接就占用了 Odoo,别的连接进不来,解决这个问题的老方法是启动更多进程,但是进程的方式太重了,随着互联网服务的普及,开发人员发现实际上只需要维护 I/O 并不需要启动多个进程或者多个进程,只需要维护好文件描述符,并且能够正确发现这些描述符什么时候该读什么时候该写。selelct,poll,epoll一步一步把异步I/O的性能榨干了最后一滴血。

在 Python 2 的时候,Python 没有内置异步 I/O 的功能,所以 gevent,Tornado 都是解决 Python 异步 I/O 问题的。Odoo 使用了 gevent,当 longpolling 服务正在服务一个客户端的时候,也没有任何消息给这个客户端,那么这个客户端将保持连接 50 秒,这时候 longpolling 服务端会把基于这个连接的处理 wait 让出 CPU,让其它连接能进来。 当这个连接的文件描述符准备好读写的时候 Odoo 通过 select 调用得以了解。然后会通过 Thread 或者 gevent 的 Event 通知等待的客户连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def loop(self):
""" Dispatch postgres notifications to the relevant polling threads/greenlets """
_logger.info("Bus.loop listen imbus on db postgres")
with odoo.sql_db.db_connect('postgres').cursor() as cr:
conn = cr._cnx
cr.execute("listen imbus")
cr.commit();
while True:
if select.select([conn], [], [], TIMEOUT) == ([], [], []):
pass
else:
conn.poll()
channels = []
while conn.notifies:
channels.extend(json.loads(conn.notifies.pop().payload))
# dispatch to local threads/greenlets
events = set()
for channel in channels:
events.update(self.channels.pop(hashable(channel), set()))
for event in events:
event.set()

上边的这段代码在 bus 模块里面,Odoo 只有一个线程或者 gevent 程序去无差别的listen 系统所有的 imbus 上的消息,notify imbus 的消息都会让 select 返回准备好的文件描述符(不是空的,所以就不会等于 ([],[],[])),无差别就是它不判断 Channel,而每个客户端是需要关注 Channel的,所以这是系统级别的不是用户级别的,它取出数据后通过 event set 来通知那些 wait 在具体 event (关联了 Channel)上的客户。

Channels 的 Overload
每次 longpolling 的 poll 请求都要带上这个用户想要关注的 channels,而 用户怎么知道自己要 polling 什么channels呢?

Channels 一般来自两种可能,一个是同一种应用导致的会话数量的增加,比如在线客服,每个新访客都有可能跟 Odoo 的用户建立一个 Channel 就是会话,这样就会有很多会话。

还有一种可能就是,Odoo 有很多应用,每个应用都会有自己建立或者判断 Channel 的方式,在线客服是 Odoo 的一个应用,CRM 也是一个应用,每个应用对 Channel 的标记和维护方法各不相同,一般是一个元组 (db,table,id) 再 hashable 或者文本化一下,就变成字符串,作为 Channel 的唯一标记,具体有多少个这样的 Channels 也是存储在各自应用的表里面。所以 bus 应用的 Controller 提供了一个可以 Overload 的机会来修改 Channels,就是 _load。

1
2
3
4
5
6
7
8
# override to add channels
def _poll(self, dbname, channels, last, options):
# update the user presence
if request.session.uid and 'bus_inactivity' in options:
request.env['bus.presence'].update(options.get('bus_inactivity'))
request.cr.close()
request._cr = None
return dispatch.poll(dbname, channels, last, options)

它轻描淡写的注释暴露了它存在的意义。

再看 mail 应用下的 controller 对这个函数的 overload。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# --------------------------
# Extends BUS Controller Poll
# --------------------------
def _poll(self, dbname, channels, last, options):
if request.session.uid:
partner_id = request.env.user.partner_id.id

if partner_id:
channels = list(channels) # do not alter original list
for mail_channel in request.env['mail.channel'].search([('channel_partner_ids', 'in', [partner_id])]):
channels.append((request.db, 'mail.channel', mail_channel.id))
# personal and needaction channel
channels.append((request.db, 'res.partner', partner_id))
channels.append((request.db, 'ir.needaction', partner_id))
return super(MailChatController, self)._poll(dbname, channels, last, options)

把在 mail (就是讨论应用)中需要的 channels 都圈出来提供给 bus 应用去处理。

这里面又学到一个 Odoo 的知识,如何搜索 many2many 的字段 (channel partner ids),因为 many2many 是 Odoo 加了一个中间表实现的,没看过这段代码还不知道咋搜索呢。这里面的 channel partner ids 是在 mail channel 中对应的 partner id,在 res partner 表中也有 partner 对应的 mail channel。这是一个多对多的关系,一个 mail channel 可以含有多个 partner,一个partner 可以在多个 mail channel 中,这很自然,人可以在很多对话中,对话中含有很多人 。但是如果你想搜索哪些对话中含有我这个人的怎么搜?

其它
Odoo 的即时通讯几乎都在 bus 这个 addon 下面,但是在odoo 全局的代码中也有很多配合的 code,比如上文提到的 gevent 命令行;还有更加复杂的部分,就是 WSGI 和 数据连接的处理部分,由于 longpolling 同时重用了普通 httprequest 和数据库运行环境 (registry,Enviroments,Enviroment,cusor),这段代码比较乱,不如 addon 里面的结构清晰,当然可能也是为了让 addon 结构清晰,不得不做出的妥协。值得说明的是,当 longpolling 的请求来的时候,WSGI 请求自带的 Odoo 数据库执行环境会被抛弃,而是每次请求重新再次建立:

1
2
3
4
event.wait(timeout=timeout)
with registry.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
notifications = env['bus.bus'].poll(channels, last, options)

让我们知道了 Odoo 如何每次建立数据环境。如果不是每次建立环境那么这里的数据操作别的客户不会同时发现的。

通过分析 Odoo 的 IM 实现过程可以看出 Odoo 的技术的确有点过时了,跟踪的不够猛。因为 Python 3 已经支持 syncio 了,关于 asyncio 可以读读这个 blog 。

如果通过 asyncio 去实现,我的思路是在 asyncio 中加入 postgresql connection 的描述符,就是上边用来select 的,watching 这个描述符。当有数据的时候 callback 就会运行,再去通过 asyncio 的 locks 中的 Event 去 set()。用 asyncio.wait_for(event.wait(), timeout) 来响应用户的请求,用户的 HTTP 请求就会被阻塞直到 Event 被 set 或者超时,而 CPU 会被让出,完美。

1
2
3
`loop.``add_reader`(*fd*, *callback*, **args*)

Start monitoring the *fd* file descriptor for read availability and invoke *callback* with the specified arguments once *fd* is available for reading.

这样就用原生的 Python 3 解决了,不需要引入 gevent,也不需要引入异步的 PostgreSQL Python 库,重用原来的 psycopg2 阻塞库。

websocket

websocket是一种基于http的全双工协议,即建立连接后,server端既可主动向客户端发送消息,客户端也可以主动向server端发送消息,这种方式性能开销小,不像http那样每次都要重新握手进行连接,也不像长连接那样需要server端一直hold住连接从而节省了server端的性能开销。

Python对websocket的实现有多种,比较著名的有:Autobahn、Django Channel、Flask-SocketIO、Websocket-client、Crossbar.io等。
这里,我选用了 Tornado 框架。

以下为最终实现效果:

websocket

源码: 有需要的同学,请联系作者: WX与QQ: 897728638。

可能的场景

在某个对象被创建的时候,现实对某些正在浏览该对象的列表视图的人进行实时的通知,并播放声音进行提醒。这里的业务场景是,工厂的操作员需要某个技术员进行协助的时候,发起一个协助请求,技术员在接收到通知后去操作员那里进行协助。

以上。

-------------本文结束感谢您的阅读-------------