https://github.com/MetaMask/json-rpc-middleware-stream/blob/master/test/index.js#L20
A small toolset for streaming json rpc and matching requests and responses. Made to be used with json-rpc-engine
.
可以用来与json-rpc-engine
结合使用的,对输入的json rpc进行读入、写出处理
json-rpc-middleware-stream/index.js
const SafeEventEmitter = require(‘safe-event-emitter‘)const DuplexStream = require(‘readable-stream‘).Duplex //即一个可读且可写的流module.exports = createStreamMiddlewarefunction createStreamMiddleware() { ?const idMap = {} ?const stream = new DuplexStream({ ???objectMode: true, //输入的是任意形式的数据,而不非是字符串和Buffer
(或Uint8Array
) ???read: readNoop, //下面定义的返回false的函数,会覆写_read(),当要从别地流处读出时调用 ???write: processMessage,//会覆写_write(),当要写入别的流时调用 ?}) ?const events = new SafeEventEmitter() ?const middleware = (req, res, next, end) => { ???// write req to stream ???stream.push(req)//从req流中读出,就会去调用readNoop函数 ???// register request on id map ???idMap[req.id] = { req, res, next, end }//并在数组中将req根据其id记录下来 ?} ?return { events, middleware, stream } ?function readNoop () { ???return false ?} ?function processMessage (res, encoding, cb) {//当要写出时调用 ???let err ???try { ?????const isNotification = !res.id //如果res.id有值,则isNotification为false;否则为true ?????if (isNotification) {//res.id没值或为0 ???????processNotification(res)//触发事件‘notification‘ ?????} else { ???????processResponse(res)//res.id有值 ?????} ???} catch (_err) { ?????err = _err ???} ???// continue processing stream ???cb(err) ?} ?function processResponse(res) {//将流中内容写出到res流 ???const context = idMap[res.id] //查看有没有与相应的res.id相同的ID的流之前读入过 ???if (!context) throw new Error(`StreamMiddleware - Unknown response id ${res.id}`) //如果context为空,则说明相应的id的流并没有读入过,这样写出的时候就不知道要怎么写出了,无end,所以会出错 ???delete idMap[res.id] //如果有读入过,则写出前先清空idMap中的相应内容 ???// copy whole res onto original res ???Object.assign(context.res, res) //然后将现在要写出到的res流覆写context.res,并返回context.res ???// run callback on empty stack, ???// prevent internal stream-handler from catching errors ???setTimeout(context.end) //调用之前读入时写好的end函数来结束写出操作 ?} ?function processNotification(res) {//该事件的监听会在inpage-provider处设置 ???events.emit(‘notification‘, res) ?}}
Object.assign(target, ...sources)
:用于将所有可枚举属性的值从一个或多个源对象复制到目标对象。它将返回目标对象。
json-rpc-middleware-stream/engineStream.js
const DuplexStream = require(‘readable-stream‘).Duplexmodule.exports = createEngineStreamfunction createEngineStream({ engine }) {//engine即RpcEngine ?if (!engine) throw new Error(‘Missing engine parameter!‘) ?const stream = new DuplexStream({ objectMode: true, read, write }) ?// forward notifications ?if (engine.on) { ???engine.on(‘notification‘, (message) => {//监听‘notification‘事件 ?????stream.push(message) //事件被触发的话就将message数据读入stream,调用read函数 ???}) ?} ?return stream ?function read () { ???return false ?} ?function write (req, encoding, cb) {//当写出时调用该函数 ???engine.handle(req, (err, res) => { ?????this.push(res) ???}) ???cb() ?}}
测试:
json-rpc-middleware-stream/test/index.js
const test = require(‘tape‘)const RpcEngine = require(‘json-rpc-engine‘)const createJsonRpcStream = require(‘../index‘)const createEngineStream = require(‘../engineStream‘)test(‘middleware - raw test‘, (t) => { ?const jsonRpcConnection = createJsonRpcStream() ?const req = { id: 1, jsonrpc: ‘2.0‘, method: ‘test‘ } ?const initRes = { id: 1, jsonrpc: ‘2.0‘ } ?const res = { id: 1, jsonrpc: ‘2.0‘, result: ‘test‘ } ?// listen for incomming requests ?jsonRpcConnection.stream.on(‘data‘, (_req) => {//监听data事件 ???t.equal(req, _req, ‘got the expected request‘)//说明触发data事件传来的 ???jsonRpcConnection.stream.write(res)//将流中的与res.id相同的数据写出 ?}) ?// run middleware, expect end fn to be called ?jsonRpcConnection.middleware(req, initRes, () => {//将req流写入createJsonRpcStream ???t.fail(‘should not call next‘) ?}, (err) => { ???t.notOk(err, ‘should not error‘) ???t.deepEqual(initRes, res, ‘got the expected response‘) ???t.end() ?})})test(‘engine to stream - raw test‘, (t) => { ?const engine = new RpcEngine() ?engine.push((req, res, next, end) => { ???res.result = ‘test‘ ???end() ?}) ?const stream = createEngineStream({ engine }) ?const req = { id: 1, jsonrpc: ‘2.0‘, method: ‘test‘ } ?const res = { id: 1, jsonrpc: ‘2.0‘, result: ‘test‘ } ?// listen for incomming requests ?stream.on(‘data‘, (_res) => { ???t.deepEqual(res, _res, ‘got the expected response‘) ???t.end() ?}) ?stream.on(‘error‘, (err) => { ???t.fail(error.message) ?}) ?stream.write(req)})test(‘middleware and engine to stream‘, (t) => {//上面两者的结合 ?// create guest ?const engineA = new RpcEngine() ?const jsonRpcConnection = createJsonRpcStream() ?engineA.push(jsonRpcConnection.middleware) ?// create host ?const engineB = new RpcEngine() ?engineB.push((req, res, next, end) => { ???res.result = ‘test‘ ???end() ?}) ?// connect both ?const clientSideStream = jsonRpcConnection.stream ?const hostSideStream = createEngineStream({ engine: engineB }) ?clientSideStream ?.pipe(hostSideStream) ?.pipe(clientSideStream) ?// request and expected result ?const req = { id: 1, jsonrpc: ‘2.0‘, method: ‘test‘ } ?const res = { id: 1, jsonrpc: ‘2.0‘, result: ‘test‘ } ?engineA.handle(req, (err, _res) => {//req调用jsonRpcConnection.middleware读入clientSideStream,然后hostSideStream从clientSideStream中读入req数据,然后调用engineB的方法写出,所以得到的result: ‘test‘ ???t.notOk(err, ‘does not error‘) ???t.deepEqual(res, _res, ‘got the expected response‘) ???t.end() ?})})test(‘server notification‘, (t) => { ?t.plan(1) ?const jsonRpcConnection = createJsonRpcStream() ?const notif = { jsonrpc: ‘2.0‘, method: ‘test_notif‘ }//这里没有设置id,所以调用write所以会触发processNotification函数,触发‘notification‘事件 ?jsonRpcConnection.events.once(‘notification‘, (message) => { ???t.equals(message.method, notif.method) ???t.end() ?}) ?// receive notification ?jsonRpcConnection.stream.write(notif)})test(‘server notification in stream‘, (t) => { ?const engine = new RpcEngine() ?const stream = createEngineStream({ engine }) ?const notif = { jsonrpc: ‘2.0‘, method: ‘test_notif‘ } ?// listen for incomming requests ?stream.once(‘data‘, (_notif) => { ???t.deepEqual(notif, _notif, ‘got the expected notification‘) ???t.end() ?}) ?stream.on(‘error‘, (err) => { ???t.fail(error.message) ?}) ?engine.emit(‘notification‘, notif)//将触发createEngineStream中的‘notification‘事件,notif将被读入stream,将触发data事件})
MetaMask/json-rpc-middleware-stream
原文地址:https://www.cnblogs.com/wanghui-garcia/p/9885113.html