mirror of
https://github.com/cool-team-official/cool-admin-midway.git
synced 2024-11-01 22:20:30 +08:00
251 lines
5.8 KiB
JavaScript
251 lines
5.8 KiB
JavaScript
"use strict";
|
|
|
|
const Redis = require("ioredis");
|
|
const MQEmitter = require("mqemitter");
|
|
const hyperid = require("hyperid")();
|
|
const inherits = require("inherits");
|
|
const LRU = require("lru-cache");
|
|
const msgpack = require("msgpack-lite");
|
|
const EE = require("events").EventEmitter;
|
|
const Pipeline = require("ioredis-auto-pipeline");
|
|
|
|
function MQEmitterRedis(opts) {
|
|
if (!(this instanceof MQEmitterRedis)) {
|
|
return new MQEmitterRedis(opts);
|
|
}
|
|
|
|
opts = opts || {};
|
|
|
|
this._opts = opts;
|
|
|
|
if (opts instanceof Array) {
|
|
this.subConn = new Redis.Cluster(opts);
|
|
this.pubConn = new Redis.Cluster(opts);
|
|
} else {
|
|
this.subConn = new Redis(opts.connectionString || opts);
|
|
this.pubConn = new Redis(opts.connectionString || opts);
|
|
}
|
|
|
|
this._pipeline = Pipeline(this.pubConn);
|
|
|
|
this._topics = {};
|
|
|
|
this._cache = new LRU({
|
|
max: 10000,
|
|
ttl: 60 * 1000, // one minute
|
|
});
|
|
|
|
this.state = new EE();
|
|
|
|
const that = this;
|
|
|
|
function onError(err) {
|
|
if (err && !that.closing) {
|
|
that.state.emit("error", err);
|
|
}
|
|
}
|
|
|
|
this._onError = onError;
|
|
|
|
function handler(sub, topic, payload) {
|
|
const packet = msgpack.decode(payload);
|
|
if (!that._cache.get(packet.id)) {
|
|
that._emit(packet.msg);
|
|
}
|
|
that._cache.set(packet.id, true);
|
|
}
|
|
|
|
this.subConn.on("messageBuffer", function (topic, message) {
|
|
handler(topic, topic, message);
|
|
});
|
|
|
|
this.subConn.on("pmessageBuffer", function (sub, topic, message) {
|
|
handler(sub, topic, message);
|
|
});
|
|
|
|
this.subConn.on("connect", function () {
|
|
that.state.emit("subConnect");
|
|
});
|
|
|
|
this.subConn.on("error", function (err) {
|
|
that._onError(err);
|
|
});
|
|
|
|
this.pubConn.on("connect", function () {
|
|
that.state.emit("pubConnect");
|
|
});
|
|
|
|
this.pubConn.on("error", function (err) {
|
|
that._onError(err);
|
|
});
|
|
|
|
MQEmitter.call(this, opts);
|
|
|
|
this._opts.regexWildcardOne = new RegExp(
|
|
this._opts.wildcardOne.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"),
|
|
"g"
|
|
);
|
|
this._opts.regexWildcardSome = new RegExp(
|
|
(this._opts.matchEmptyLevels
|
|
? this._opts.separator.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&") +
|
|
"?"
|
|
: "") +
|
|
this._opts.wildcardSome.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"),
|
|
"g"
|
|
);
|
|
}
|
|
|
|
inherits(MQEmitterRedis, MQEmitter);
|
|
["emit", "on", "removeListener", "close"].forEach(function (name) {
|
|
MQEmitterRedis.prototype["_" + name] = MQEmitterRedis.prototype[name];
|
|
});
|
|
|
|
MQEmitterRedis.prototype.close = function (cb) {
|
|
cb = cb || noop;
|
|
|
|
if (this.closed || this.closing) {
|
|
return cb();
|
|
}
|
|
|
|
this.closing = true;
|
|
|
|
let count = 2;
|
|
const that = this;
|
|
|
|
function onEnd() {
|
|
if (--count === 0) {
|
|
that._close(cb);
|
|
}
|
|
}
|
|
|
|
this.subConn.on("end", onEnd);
|
|
this.subConn.quit();
|
|
|
|
this.pubConn.on("end", onEnd);
|
|
this.pubConn.quit();
|
|
|
|
return this;
|
|
};
|
|
|
|
MQEmitterRedis.prototype._subTopic = function (topic) {
|
|
return topic
|
|
.replace(this._opts.regexWildcardOne, "*")
|
|
.replace(this._opts.regexWildcardSome, "*");
|
|
};
|
|
|
|
MQEmitterRedis.prototype.on = function on(topic, cb, done) {
|
|
const subTopic = this._subTopic(topic);
|
|
const onFinish = function () {
|
|
if (done) {
|
|
setImmediate(done);
|
|
}
|
|
};
|
|
|
|
this._on(topic, cb);
|
|
|
|
if (this._topics[subTopic]) {
|
|
this._topics[subTopic]++;
|
|
onFinish.call(this);
|
|
return this;
|
|
}
|
|
|
|
this._topics[subTopic] = 1;
|
|
|
|
if (this._containsWildcard(topic)) {
|
|
this.subConn.psubscribe(subTopic, onFinish.bind(this));
|
|
} else {
|
|
this.subConn.subscribe(subTopic, onFinish.bind(this));
|
|
}
|
|
|
|
return this;
|
|
};
|
|
|
|
MQEmitterRedis.prototype.emit = function (msg, done) {
|
|
done = done || this._onError;
|
|
|
|
if (this.closed) {
|
|
const err = new Error("mqemitter-redis is closed");
|
|
return done(err);
|
|
}
|
|
|
|
const packet = {
|
|
id: hyperid(),
|
|
msg,
|
|
};
|
|
|
|
this._pipeline
|
|
.publish(msg.topic, msgpack.encode(packet))
|
|
.then(() => done())
|
|
.catch(done);
|
|
};
|
|
|
|
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
|
|
const subTopic = this._subTopic(topic);
|
|
const onFinish = function () {
|
|
if (done) {
|
|
setImmediate(done);
|
|
}
|
|
};
|
|
|
|
this._removeListener(topic, cb);
|
|
|
|
if (--this._topics[subTopic] > 0) {
|
|
onFinish();
|
|
return this;
|
|
}
|
|
|
|
delete this._topics[subTopic];
|
|
|
|
if (this._containsWildcard(topic)) {
|
|
this.subConn.punsubscribe(subTopic, onFinish);
|
|
} else if (this._matcher.match(topic)) {
|
|
this.subConn.unsubscribe(subTopic, onFinish);
|
|
}
|
|
|
|
return this;
|
|
};
|
|
|
|
MQEmitterRedis.prototype._containsWildcard = function (topic) {
|
|
return (
|
|
topic.indexOf(this._opts.wildcardOne) >= 0 ||
|
|
topic.indexOf(this._opts.wildcardSome) >= 0
|
|
);
|
|
};
|
|
|
|
function noop() {}
|
|
|
|
module.exports = MQEmitterRedis;
|
|
|
|
function MQEmitterRedisPrefix(pubSubPrefix, options) {
|
|
MQEmitterRedis.call(this, options);
|
|
this._pubSubPrefix = pubSubPrefix;
|
|
this._sym_proxiedCallback = Symbol("proxiedCallback");
|
|
}
|
|
inherits(MQEmitterRedisPrefix, MQEmitterRedis);
|
|
MQEmitterRedisPrefix.prototype.on = function (topic, cb, done) {
|
|
const t = this._pubSubPrefix + topic;
|
|
cb[this._sym_proxiedCallback] = function (packet, cbcb) {
|
|
const t = packet.topic.slice(this._pubSubPrefix.length);
|
|
const p = { ...packet, topic: t };
|
|
return cb.call(this, p, cbcb);
|
|
}.bind(this);
|
|
return MQEmitterRedis.prototype.on.call(
|
|
this,
|
|
t,
|
|
cb[this._sym_proxiedCallback],
|
|
done
|
|
);
|
|
};
|
|
MQEmitterRedisPrefix.prototype.removeListener = function (topic, func, done) {
|
|
const t = this._pubSubPrefix + topic;
|
|
const f = func[this._sym_proxiedCallback];
|
|
return MQEmitterRedis.prototype.removeListener.call(this, t, f, done);
|
|
};
|
|
MQEmitterRedisPrefix.prototype.emit = function (packet, done) {
|
|
const t = this._pubSubPrefix + packet.topic;
|
|
const p = { ...packet, topic: t };
|
|
return MQEmitterRedis.prototype.emit.call(this, p, done);
|
|
};
|
|
|
|
module.exports.MQEmitterRedisPrefix = MQEmitterRedisPrefix;
|