基于BottledWater-PG+nodejs实时地图应用实践
目录
前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。
一 服务器端
var fs = require('fs');
var http = require('http');
var socket = require('socket.io');
var Kafka = require('node-rdkafka');
var server = http.createServer(function(req, res) {
res.writeHead(200, { 'Content-type': 'text/html'});
res.end(fs.readFileSync(__dirname + '/index.html'));
}).listen(8081, function() {
console.log('Listening at: http://localhost:8081');
});
//注册socket.io
var socketio=socket.listen(server);
socketio.on('connection', function (socketclient) {
console.log('已连接socket:');
//socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人
//socketclient.emit('GPSCoor', data.payload);//广播给自己
});
var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': '192.168.43.27:9092',
'group.id': 'node-rdkafka-consumer-flow-example',
'enable.auto.commit': false
});
var topicName = 'gps';
//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});
//打印错误
consumer.on('error', function(err) {
console.error('Error from consumer');
console.error(err);
});
consumer.on('ready', function(arg) {
console.log('consumer ready.' + JSON.stringify(arg));
consumer.subscribe([topicName]);
//准备消费消息
consumer.consume();
});
consumer.on('data', function(m) {
console.log(m);
let _data;
if(m.value==null)//delete操作发送来的消息
{
_data=JSON.parse(m.key);
_data.tg_op='delete';
}
else{
_data=m.value.toString();
_data=JSON.parse(_data);
}
console.log(_data);
socketio.emit('GPSCoor', _data);//广播给所有的客户端
});
consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});
//启动
consumer.connect();
二 客户端
实时地图应用
三 测试成果
3.1 新增
mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
INSERT 0 1
3.2 修改
mcsas=# update gps set geom='Point(115 40)' where name='opy';
UPDATE 1
3.3 删除
mcsas=# delete from gps where name='opy';
DELETE 1
四 总结
BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。