Skip to content

Commit 6f9f36e

Browse files
committed
feat: 🎸 improve publish parsing
1 parent 4954504 commit 6f9f36e

File tree

3 files changed

+40
-47
lines changed

3 files changed

+40
-47
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
const {MqttDecoder} = require('../../es6/MqttDecoder');
2+
const {connectShort, publishSample} = require('../packets');
3+
4+
const parser = new MqttDecoder();
5+
const max = 10e6;
6+
let i;
7+
const start = Date.now() / 1000;
8+
9+
for (i = 0; i < max; i++) {
10+
parser.push(Buffer.from(publishSample));
11+
while (parser.parse()) {}
12+
}
13+
14+
const time = Date.now() / 1000 - start;
15+
console.log('Total packets', max);
16+
console.log('Total time', Math.round(time * 100) / 100);
17+
console.log('Packets/s', (Math.round(max / time / 1e4) / 1e2) + 'M');
18+
console.log('Memory used', (process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2) + ' Mb');

‎src/MqttDecoder.ts

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import {BufferList} from './BufferList';
22
import {ERROR, PACKET_TYPE} from './enums';
33
import {PacketConnack, parseConnack} from './packets/connack';
44
import {PacketConnect, parseConnect} from './packets/connect';
5-
import {PacketPublish, parsePublish} from './packets/publish';
5+
import {PacketPublish} from './packets/publish';
66
import {PacketPuback, parsePuback} from './packets/puback';
77
import {PacketPubrec, parsePubrec} from './packets/pubrec';
88
import {PacketPubrel, parsePubrel} from './packets/pubrel';
@@ -15,17 +15,14 @@ import {PacketPingreq} from './packets/pingreq';
1515
import {PacketPingresp} from './packets/pingresp';
1616
import {PacketDisconnect, parseDisconnect} from './packets/disconnect';
1717
import {PacketAuth, parseAuth} from './packets/auth';
18-
import { parseBinary, parseProps } from './util/parse';
19-
import { Properties } from './types';
18+
import {parseBinary, parseProps} from './util/parse';
19+
import {Properties} from './types';
2020

2121
const enum DECODER_STATE {
2222
HEADER = 0,
2323
DATA = 1,
2424
}
2525

26-
const parsers = new Map();
27-
parsers.set(PACKET_TYPE.PUBLISH, parsePublish);
28-
2926
export class MqttDecoder {
3027
/** Keeps track of which part message framing are we in. */
3128
private state: DECODER_STATE = DECODER_STATE.HEADER;
@@ -126,15 +123,14 @@ export class MqttDecoder {
126123

127124
if (this.state !== DECODER_STATE.DATA) return;
128125

129-
const {l: length} = this;
126+
const {b, l} = this;
130127
let offset = this.offset;
131-
const end = offset + length;
132-
if (list.length < end) return;
128+
const packetEndOffset = offset + l;
129+
if (list.length < packetEndOffset) return;
133130

134131
this.state = DECODER_STATE.HEADER;
135132
this.offset = 0;
136133

137-
const {b, l} = this;
138134
const type: PACKET_TYPE = (b >> 4) as PACKET_TYPE;
139135
switch (type) {
140136
case PACKET_TYPE.PUBLISH: {
@@ -151,80 +147,80 @@ export class MqttDecoder {
151147
p = props;
152148
offset += size;
153149
}
154-
const d = list.slice(offset, list.length);
150+
const d = list.slice(offset, packetEndOffset);
155151
const t = topic.toString('utf8');
156-
list.consume(end);
152+
list.consume(packetEndOffset);
157153
return new PacketPublish(b, l, t, i, p, d);
158154
}
159155
case PACKET_TYPE.CONNECT: {
160156
const packet = parseConnect(b, l, list, offset);
161157
this.version = packet.v;
162-
list.consume(end);
158+
list.consume(packetEndOffset);
163159
return packet;
164160
}
165161
case PACKET_TYPE.CONNACK: {
166162
const packet = parseConnack(b, l, list, this.version, offset);
167-
list.consume(end);
163+
list.consume(packetEndOffset);
168164
return packet;
169165
}
170166
case PACKET_TYPE.PUBACK: {
171167
const packet = parsePuback(b, l, list, this.version, offset);
172-
list.consume(end);
168+
list.consume(packetEndOffset);
173169
return packet;
174170
}
175171
case PACKET_TYPE.PUBREC: {
176172
const packet = parsePubrec(b, l, list, this.version, offset);
177-
list.consume(end);
173+
list.consume(packetEndOffset);
178174
return packet;
179175
}
180176
case PACKET_TYPE.PUBREL: {
181177
const packet = parsePubrel(b, l, list, this.version, offset);
182-
list.consume(end);
178+
list.consume(packetEndOffset);
183179
return packet;
184180
}
185181
case PACKET_TYPE.PUBCOMP: {
186182
const packet = parsePubcomp(b, l, list, this.version, offset);
187-
list.consume(end);
183+
list.consume(packetEndOffset);
188184
return packet;
189185
}
190186
case PACKET_TYPE.SUBSCRIBE: {
191187
const packet = parseSubscribe(b, l, list, this.version, offset);
192-
list.consume(end);
188+
list.consume(packetEndOffset);
193189
return packet;
194190
}
195191
case PACKET_TYPE.SUBACK: {
196192
const packet = parseSuback(b, l, list, this.version, offset);
197-
list.consume(end);
193+
list.consume(packetEndOffset);
198194
return packet;
199195
}
200196
case PACKET_TYPE.UNSUBSCRIBE: {
201197
const packet = parseUnsubscribe(b, l, list, this.version, offset);
202-
list.consume(end);
198+
list.consume(packetEndOffset);
203199
return packet;
204200
}
205201
case PACKET_TYPE.UNSUBACK: {
206202
const packet = parseUnsuback(b, l, list, this.version, offset);
207-
list.consume(end);
203+
list.consume(packetEndOffset);
208204
return packet;
209205
}
210206
case PACKET_TYPE.PINGREQ: {
211207
const packet = new PacketPingreq(b, l);
212-
list.consume(end);
208+
list.consume(packetEndOffset);
213209
return packet;
214210
}
215211
case PACKET_TYPE.PINGRESP: {
216212
const packet = new PacketPingresp(b, l);
217-
list.consume(end);
213+
list.consume(packetEndOffset);
218214
return packet;
219215
}
220216
case PACKET_TYPE.DISCONNECT: {
221217
const packet = parseDisconnect(b, l, list, this.version, offset);
222-
list.consume(end);
218+
list.consume(packetEndOffset);
223219
return packet;
224220
}
225221
case PACKET_TYPE.AUTH: {
226222
const packet = parseAuth(b, l, list, this.version, offset);
227-
list.consume(end);
223+
list.consume(packetEndOffset);
228224
return packet;
229225
}
230226
default: throw ERROR.MALFORMED_PACKET;

‎src/packets/publish.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import {BufferList} from '../BufferList';
21
import {Packet, PacketHeaderData} from '../packet';
32
import {Properties} from '../types';
4-
import {parseBinary, parseProps} from '../util/parse';
53

64
export interface PacketPublishData extends PacketHeaderData {
75
/** Topic Name. */
@@ -26,22 +24,3 @@ export class PacketPublish extends Packet implements PacketPublishData {
2624
super(b, l);
2725
}
2826
}
29-
30-
export const parsePublish = (b: number, l: number, data: BufferList, version: number, offset: number): PacketPublish => {
31-
const topic = parseBinary(data, offset);
32-
offset += 2 + topic.byteLength;
33-
let i: number = 0;
34-
if (((b >> 1) & 0b11) > 0) {
35-
i = data.readUInt16BE(offset);
36-
offset += 2;
37-
}
38-
let p: Properties = {};
39-
if (version === 5) {
40-
const [props, size] = parseProps(data, offset);
41-
p = props;
42-
offset += size;
43-
}
44-
const d = data.slice(offset, data.length);
45-
const t = topic.toString('utf8');
46-
return new PacketPublish(b, l, t, i, p, d);
47-
};

0 commit comments

Comments
 (0)