Skip to content

Commit 69d4ca4

Browse files
committed
feat: 🎸 add sample server stub
1 parent eceb1f2 commit 69d4ca4

File tree

6 files changed

+485
-0
lines changed

6 files changed

+485
-0
lines changed

‎src/MqttParser.ts

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import {BufferList} from './BufferList';
2+
import {REASON, PACKET_TYPE} from './enums';
3+
import {parseConnack} from './packets/connack';
4+
import {PacketConnect} from './packets/connect';
5+
import {PacketPublish} from './packets/publish';
6+
import {parsePuback} from './packets/puback';
7+
import {parsePubrec} from './packets/pubrec';
8+
import {parsePubrel} from './packets/pubrel';
9+
import {parsePubcomp} from './packets/pubcomp';
10+
import {parseSubscribe} from './packets/subscribe';
11+
import {parseSuback} from './packets/suback';
12+
import {parseUnsubscribe} from './packets/unsubscribe';
13+
import {parseUnsuback} from './packets/unsuback';
14+
import {parseDisconnect} from './packets/disconnect';
15+
import {parseAuth} from './packets/auth';
16+
import {parseBinary} from './util/parse';
17+
import {parseProps} from './util/parseProps';
18+
import {Properties} from './types';
19+
import {MqttRouter} from './MqttRouter';
20+
21+
const enum PARSER_STATE {
22+
HEADER = 0,
23+
DATA = 1,
24+
}
25+
26+
export class MqttParser {
27+
/** Keeps track of which part message framing are we in. */
28+
private state: PARSER_STATE = PARSER_STATE.HEADER;
29+
30+
/** Buffer which contains all unparsed buffered data. */
31+
public list = new BufferList();
32+
33+
/** Current packet first byte. */
34+
public b: number = 0;
35+
36+
/** Current packet length. */
37+
public l: number = 0;
38+
39+
/**
40+
* MQTT protocol version. Defaults to 4 as that is most popular version now.
41+
* This version is automatically set to the version received in CONNECT packet.
42+
*/
43+
public version: number = 4;
44+
45+
private offset: number = 0;
46+
47+
constructor(private readonly router: MqttRouter) {}
48+
49+
/**
50+
* Use this method to push into decoder all new bytes that arrive over the
51+
* socket. Each time you push a chunk it can result in zero or more packets
52+
* returned by `.parse()` method.
53+
*
54+
* @param buf Raw data bytes chunk as received from socket.
55+
*/
56+
public push(buf: Buffer) {
57+
this.list.append(buf);
58+
}
59+
60+
/**
61+
* Back-pressure of unparsed data.
62+
*
63+
* @returns Returns number of bytes of data that has been buffered but not
64+
* parsed yet.
65+
*/
66+
public bufferSize() {
67+
return this.list.length;
68+
}
69+
70+
/**
71+
* @returns Returns a single parsed packet. If there is not enough data in
72+
* the buffer to parse a packet, returns `undefined`.
73+
*/
74+
public parse(): void {
75+
try {
76+
const list = this.list;
77+
if (!list.length) return;
78+
79+
if (this.state === PARSER_STATE.HEADER) {
80+
const length = list.length;
81+
if (length < 2) return;
82+
this.b = list.readUInt8(0);
83+
const b1 = list.readUInt8(1);
84+
if (b1 & 0b10000000) {
85+
if (length < 3) return;
86+
const b2 = list.readUInt8(2);
87+
if (b2 & 0b10000000) {
88+
if (length < 4) return;
89+
const b3 = list.readUInt8(3);
90+
if (b3 & 0b10000000) {
91+
if (length < 5) return;
92+
const b4 = list.readUInt8(4);
93+
this.offset = 5;
94+
this.l = ((b4 & 0b01111111) << 21) + ((b3 & 0b01111111) << 14) + ((b2 & 0b01111111) << 7) + (b1 & 0b01111111);
95+
} else {
96+
this.offset = 4;
97+
this.l = ((b3 & 0b01111111) << 14) + ((b2 & 0b01111111) << 7) + (b1 & 0b01111111);
98+
}
99+
} else {
100+
this.offset = 3;
101+
this.l = ((b2 & 0b01111111) << 7) + (b1 & 0b01111111);
102+
}
103+
} else {
104+
this.offset = 2;
105+
this.l = b1 & 0b01111111;
106+
}
107+
this.state = PARSER_STATE.DATA;
108+
}
109+
110+
if (this.state !== PARSER_STATE.DATA) return;
111+
112+
const {b, l} = this;
113+
let offset = this.offset;
114+
const packetEndOffset = offset + l;
115+
if (list.length < packetEndOffset) return;
116+
117+
this.state = PARSER_STATE.HEADER;
118+
this.offset = 0;
119+
120+
const buf = list.slice(offset, offset + l);
121+
list.consume(packetEndOffset);
122+
123+
const type: PACKET_TYPE = (b >> 4) as PACKET_TYPE;
124+
switch (type) {
125+
case PACKET_TYPE.PUBLISH: {
126+
let offset = 0;
127+
const topic = parseBinary(buf, offset);
128+
offset += 2 + topic.byteLength;
129+
let i: number = 0;
130+
if (((b >> 1) & 0b11) > 0) {
131+
i = buf.readUInt16BE(offset);
132+
offset += 2;
133+
}
134+
let p: Properties = {};
135+
if (this.version === 5) {
136+
const [props, size] = parseProps(buf, offset);
137+
p = props;
138+
offset += size;
139+
}
140+
const d = buf.slice(offset, packetEndOffset);
141+
const t = topic.toString('utf8');
142+
const packet = new PacketPublish(b, l, t, i, p, d);
143+
this.router.onPublish(packet);
144+
return;
145+
}
146+
case PACKET_TYPE.CONNECT: {
147+
offset = 2 + buf.readUInt16BE(0); // Skip "MQTT" or "MQIsdp" protocol name.
148+
const v = buf.readUInt8(offset++);
149+
const f = buf.readUInt8(offset++);
150+
const k = buf.readUInt16BE(offset);
151+
offset += 2;
152+
// const ui32 = buf.readUInt32BE(offset);
153+
// const v = (ui32 & 0xFF000000) >> 24;
154+
// const f = (ui32 & 0xFF0000) >> 16;
155+
// const k = (ui32 & 0xFFFF);
156+
// offset += 4;
157+
const isV5 = v === 5;
158+
let p: Properties = {};
159+
if (isV5) {
160+
const [props, propsSize] = parseProps(buf, offset);
161+
p = props;
162+
offset += propsSize;
163+
}
164+
const clientId = parseBinary(buf, offset);
165+
const id = clientId.toString('utf8');
166+
offset += 2 + clientId.byteLength;
167+
const packet = new PacketConnect(b, l, v, f, k, p, id)
168+
if (packet.willFlag()) {
169+
if (isV5) {
170+
const [props, propsSize] = parseProps(buf, offset);
171+
packet.wp = props;
172+
offset += propsSize;
173+
} else packet.wp = {};
174+
const willTopic = parseBinary(buf, offset);
175+
packet.wt = willTopic.toString('utf8');
176+
offset += 2 + willTopic.byteLength;
177+
const willPayload = parseBinary(buf, offset);
178+
packet.w = willPayload;
179+
offset += 2 + willPayload.byteLength;
180+
}
181+
if (packet.userNameFlag()) {
182+
const userName = parseBinary(buf, offset);
183+
packet.usr = userName.toString('utf8');
184+
offset += 2 + userName.byteLength;
185+
}
186+
if (packet.passwordFlag()) {
187+
const password = parseBinary(buf, offset);
188+
packet.pwd = password;
189+
offset += 2 + password.byteLength;
190+
}
191+
this.version = packet.v;
192+
this.router.onConnect(packet);
193+
return;
194+
}
195+
case PACKET_TYPE.CONNACK: return this.router.onConnack(parseConnack(b, l, buf, this.version));
196+
case PACKET_TYPE.PUBACK: return this.router.onPuback(parsePuback(b, l, buf, this.version));
197+
case PACKET_TYPE.PUBREC: return this.router.onPubrec(parsePubrec(b, l, buf, this.version));
198+
case PACKET_TYPE.PUBREL: return this.router.onPubrel(parsePubrel(b, l, buf, this.version));
199+
case PACKET_TYPE.PUBCOMP: return this.router.onPubcomp(parsePubcomp(b, l, buf, this.version));
200+
case PACKET_TYPE.SUBSCRIBE: return this.router.onSubscribe(parseSubscribe(b, l, buf, this.version));
201+
case PACKET_TYPE.SUBACK: return this.router.onSuback(parseSuback(b, l, buf, this.version));
202+
case PACKET_TYPE.UNSUBSCRIBE: return this.router.onUnsubscribe(parseUnsubscribe(b, l, buf, this.version));
203+
case PACKET_TYPE.UNSUBACK: return this.router.onUnsuback(parseUnsuback(b, l, buf, this.version));
204+
case PACKET_TYPE.PINGREQ: return this.router.onPingreq();
205+
case PACKET_TYPE.PINGRESP: return this.router.onPingresp();
206+
case PACKET_TYPE.DISCONNECT: return this.router.onDisconnect(parseDisconnect(b, l, buf, this.version));
207+
case PACKET_TYPE.AUTH: return this.router.onAuth(parseAuth(b, l, buf, this.version));
208+
default: throw REASON.MalformedPacket;
209+
}
210+
} catch (error) {
211+
throw REASON.ProtocolError;
212+
}
213+
}
214+
}

‎src/MqttRouter.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import type {
2+
PacketAuth,
3+
PacketConnack,
4+
PacketConnect,
5+
PacketDisconnect,
6+
PacketPuback,
7+
PacketPubcomp,
8+
PacketPublish,
9+
PacketPubrec,
10+
PacketPubrel,
11+
PacketSuback,
12+
PacketSubscribe,
13+
PacketUnsuback,
14+
PacketUnsubscribe
15+
} from './packets';
16+
17+
export interface MqttRouter {
18+
onAuth(packet: PacketAuth): void;
19+
onConnack(packet: PacketConnack): void;
20+
onConnect(packet: PacketConnect): void;
21+
onDisconnect(packet: PacketDisconnect): void;
22+
onPingreq(): void;
23+
onPingresp(): void;
24+
onPuback(packet: PacketPuback): void;
25+
onPubcomp(packet: PacketPubcomp): void;
26+
onPublish(packet: PacketPublish): void;
27+
onPubrec(packet: PacketPubrec): void;
28+
onPubrel(packet: PacketPubrel): void;
29+
onSuback(packet: PacketSuback): void;
30+
onSubscribe(packet: PacketSubscribe): void;
31+
onUnsuback(packet: PacketUnsuback): void;
32+
onUnsubscribe(packet: PacketUnsubscribe): void;
33+
}

‎src/MqttTcpConnection.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import {Socket} from 'net';
2+
import {PACKET_TYPE} from './enums';
3+
import {MqttDecoder} from './MqttDecoder';
4+
import {
5+
PacketAuth,
6+
PacketConnect,
7+
PacketDisconnect,
8+
PacketPuback,
9+
PacketPubcomp,
10+
PacketPublish,
11+
PacketPubrec,
12+
PacketPubrel,
13+
PacketSubscribe,
14+
PacketUnsubscribe,
15+
PINGRESP,
16+
} from './packets';
17+
import {SomePacket} from './types';
18+
19+
export interface MqttTcpConnectionBehavior {
20+
onAuth(packet: PacketAuth): void;
21+
onConnect(packet: PacketConnect): void;
22+
onDisconnect(packet: PacketDisconnect): void;
23+
onPuback(packet: PacketPuback): void;
24+
onPubcomp(packet: PacketPubcomp): void;
25+
onPublish(packet: PacketPublish): void;
26+
onPubrec(packet: PacketPubrec): void;
27+
onPubrel(packet: PacketPubrel): void;
28+
onSubscribe(packet: PacketSubscribe): void;
29+
onUnsubscribe(packet: PacketUnsubscribe): void;
30+
}
31+
32+
export class MqttTcpConnection {
33+
protected readonly mqtt: MqttDecoder;
34+
public behavior!: MqttTcpConnectionBehavior
35+
36+
constructor(protected readonly socket: Socket) {
37+
const mqtt = this.mqtt = new MqttDecoder();
38+
socket.addListener('data', (data) => {
39+
mqtt.push(data);
40+
let packet: SomePacket | undefined;
41+
// tslint:disable-next-line no-conditional-assignment
42+
while (packet = mqtt.parse()) this.onPacket(packet);
43+
});
44+
}
45+
46+
protected onPacket(packet: SomePacket) {
47+
const behavior = this.behavior;
48+
const type = packet.type();
49+
switch (type) {
50+
case PACKET_TYPE.AUTH:
51+
behavior.onAuth(packet as PacketAuth);
52+
break;
53+
case PACKET_TYPE.CONNECT:
54+
behavior.onConnect(packet as PacketConnect);
55+
break;
56+
case PACKET_TYPE.DISCONNECT:
57+
behavior.onDisconnect(packet as PacketDisconnect);
58+
break;
59+
case PACKET_TYPE.PINGREQ:
60+
this.socket.write(PINGRESP);
61+
break;
62+
case PACKET_TYPE.PUBACK:
63+
behavior.onPuback(packet as PacketPuback);
64+
break;
65+
case PACKET_TYPE.PUBCOMP:
66+
behavior.onPubcomp(packet as PacketPubcomp);
67+
break;
68+
case PACKET_TYPE.PUBLISH:
69+
behavior.onPublish(packet as PacketPublish);
70+
break;
71+
case PACKET_TYPE.PUBREC:
72+
behavior.onPubrec(packet as PacketPubrec);
73+
break;
74+
case PACKET_TYPE.PUBREL:
75+
behavior.onPubrel(packet as PacketPubrel);
76+
break;
77+
case PACKET_TYPE.SUBSCRIBE:
78+
behavior.onSubscribe(packet as PacketSubscribe);
79+
break;
80+
case PACKET_TYPE.UNSUBSCRIBE:
81+
behavior.onUnsubscribe(packet as PacketUnsubscribe);
82+
break;
83+
default:
84+
this.socket.end();
85+
break;
86+
}
87+
}
88+
89+
public send(packet: SomePacket) {
90+
const buf = packet.toBuffer(this.mqtt.version);
91+
this.socket.write(buf);
92+
}
93+
94+
public async close(buf?: Buffer): Promise<void> {
95+
return new Promise((resolve) => {
96+
if (buf) {
97+
this.socket.end(buf, () => {
98+
resolve();
99+
});
100+
} else {
101+
this.socket.end(() => {
102+
resolve();
103+
});
104+
}
105+
});
106+
}
107+
}

0 commit comments

Comments
 (0)