cool-admin-midway/packages/other/mqemitter-redis/mqemitter-redis.js
2023-04-04 09:15:48 +08:00

251 lines
5.8 KiB
JavaScript

"use strict";
const Redis = require("ioredis");
const MQEmitter = require("mqemitter");
const hyperid = require("hyperid")();
const inherits = require("inherits");
const LRU = require("lru-cache");
const msgpack = require("msgpack-lite");
const EE = require("events").EventEmitter;
const Pipeline = require("ioredis-auto-pipeline");
function MQEmitterRedis(opts) {
if (!(this instanceof MQEmitterRedis)) {
return new MQEmitterRedis(opts);
}
opts = opts || {};
this._opts = opts;
if (opts instanceof Array) {
this.subConn = new Redis.Cluster(opts);
this.pubConn = new Redis.Cluster(opts);
} else {
this.subConn = new Redis(opts.connectionString || opts);
this.pubConn = new Redis(opts.connectionString || opts);
}
this._pipeline = Pipeline(this.pubConn);
this._topics = {};
this._cache = new LRU({
max: 10000,
ttl: 60 * 1000, // one minute
});
this.state = new EE();
const that = this;
function onError(err) {
if (err && !that.closing) {
that.state.emit("error", err);
}
}
this._onError = onError;
function handler(sub, topic, payload) {
const packet = msgpack.decode(payload);
if (!that._cache.get(packet.id)) {
that._emit(packet.msg);
}
that._cache.set(packet.id, true);
}
this.subConn.on("messageBuffer", function (topic, message) {
handler(topic, topic, message);
});
this.subConn.on("pmessageBuffer", function (sub, topic, message) {
handler(sub, topic, message);
});
this.subConn.on("connect", function () {
that.state.emit("subConnect");
});
this.subConn.on("error", function (err) {
that._onError(err);
});
this.pubConn.on("connect", function () {
that.state.emit("pubConnect");
});
this.pubConn.on("error", function (err) {
that._onError(err);
});
MQEmitter.call(this, opts);
this._opts.regexWildcardOne = new RegExp(
this._opts.wildcardOne.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"),
"g"
);
this._opts.regexWildcardSome = new RegExp(
(this._opts.matchEmptyLevels
? this._opts.separator.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&") +
"?"
: "") +
this._opts.wildcardSome.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"),
"g"
);
}
inherits(MQEmitterRedis, MQEmitter);
["emit", "on", "removeListener", "close"].forEach(function (name) {
MQEmitterRedis.prototype["_" + name] = MQEmitterRedis.prototype[name];
});
MQEmitterRedis.prototype.close = function (cb) {
cb = cb || noop;
if (this.closed || this.closing) {
return cb();
}
this.closing = true;
let count = 2;
const that = this;
function onEnd() {
if (--count === 0) {
that._close(cb);
}
}
this.subConn.on("end", onEnd);
this.subConn.quit();
this.pubConn.on("end", onEnd);
this.pubConn.quit();
return this;
};
MQEmitterRedis.prototype._subTopic = function (topic) {
return topic
.replace(this._opts.regexWildcardOne, "*")
.replace(this._opts.regexWildcardSome, "*");
};
MQEmitterRedis.prototype.on = function on(topic, cb, done) {
const subTopic = this._subTopic(topic);
const onFinish = function () {
if (done) {
setImmediate(done);
}
};
this._on(topic, cb);
if (this._topics[subTopic]) {
this._topics[subTopic]++;
onFinish.call(this);
return this;
}
this._topics[subTopic] = 1;
if (this._containsWildcard(topic)) {
this.subConn.psubscribe(subTopic, onFinish.bind(this));
} else {
this.subConn.subscribe(subTopic, onFinish.bind(this));
}
return this;
};
MQEmitterRedis.prototype.emit = function (msg, done) {
done = done || this._onError;
if (this.closed) {
const err = new Error("mqemitter-redis is closed");
return done(err);
}
const packet = {
id: hyperid(),
msg,
};
this._pipeline
.publish(msg.topic, msgpack.encode(packet))
.then(() => done())
.catch(done);
};
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
const subTopic = this._subTopic(topic);
const onFinish = function () {
if (done) {
setImmediate(done);
}
};
this._removeListener(topic, cb);
if (--this._topics[subTopic] > 0) {
onFinish();
return this;
}
delete this._topics[subTopic];
if (this._containsWildcard(topic)) {
this.subConn.punsubscribe(subTopic, onFinish);
} else if (this._matcher.match(topic)) {
this.subConn.unsubscribe(subTopic, onFinish);
}
return this;
};
MQEmitterRedis.prototype._containsWildcard = function (topic) {
return (
topic.indexOf(this._opts.wildcardOne) >= 0 ||
topic.indexOf(this._opts.wildcardSome) >= 0
);
};
function noop() {}
module.exports = MQEmitterRedis;
function MQEmitterRedisPrefix(pubSubPrefix, options) {
MQEmitterRedis.call(this, options);
this._pubSubPrefix = pubSubPrefix;
this._sym_proxiedCallback = Symbol("proxiedCallback");
}
inherits(MQEmitterRedisPrefix, MQEmitterRedis);
MQEmitterRedisPrefix.prototype.on = function (topic, cb, done) {
const t = this._pubSubPrefix + topic;
cb[this._sym_proxiedCallback] = function (packet, cbcb) {
const t = packet.topic.slice(this._pubSubPrefix.length);
const p = { ...packet, topic: t };
return cb.call(this, p, cbcb);
}.bind(this);
return MQEmitterRedis.prototype.on.call(
this,
t,
cb[this._sym_proxiedCallback],
done
);
};
MQEmitterRedisPrefix.prototype.removeListener = function (topic, func, done) {
const t = this._pubSubPrefix + topic;
const f = func[this._sym_proxiedCallback];
return MQEmitterRedis.prototype.removeListener.call(this, t, f, done);
};
MQEmitterRedisPrefix.prototype.emit = function (packet, done) {
const t = this._pubSubPrefix + packet.topic;
const p = { ...packet, topic: t };
return MQEmitterRedis.prototype.emit.call(this, p, done);
};
module.exports.MQEmitterRedisPrefix = MQEmitterRedisPrefix;