Skip to content

Commit fa070a7

Browse files
committed
feat: 🎸 add encoding for PUBLISH packets
1 parent c51b0c8 commit fa070a7

File tree

2 files changed

+147
-0
lines changed

2 files changed

+147
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import {PACKET_TYPE, PROPERTY} from '../../enums';
2+
import {PacketPublish} from '../publish';
3+
import {MqttDecoder} from '../../MqttDecoder';
4+
5+
test('can create a packet', () => {
6+
const packet = PacketPublish.create('topic', 0, {}, Buffer.from([1, 2, 3]));
7+
expect(packet.b >> 4).toBe(PACKET_TYPE.PUBLISH);
8+
expect(packet.t).toBe('topic');
9+
expect(packet.i).toBe(0);
10+
expect(packet.p).toEqual({});
11+
expect(packet.d).toEqual(Buffer.from([1, 2, 3]));
12+
});
13+
14+
test('can serialize a basic packet', () => {
15+
const packet1 = PacketPublish.create('topic', 0, {}, Buffer.from([1, 2, 3]));
16+
const buf = packet1.toBuffer(5);
17+
const decoder = new MqttDecoder();
18+
decoder.version = 5;
19+
decoder.push(buf);
20+
const packet2 = decoder.parse()! as PacketPublish;
21+
expect(packet2.b).toBe(packet1.b);
22+
expect(packet2.t).toBe('topic');
23+
expect(packet2.p).toEqual({});
24+
expect(packet2.d).toEqual(Buffer.from([1, 2, 3]));
25+
expect(packet2).toEqual(packet1);
26+
});
27+
28+
test('can encode packet with QoS = 1', () => {
29+
const packet1 = PacketPublish.create('topic', 1, {}, Buffer.from([1, 2, 3]));
30+
packet1.setQualityOfService(1);
31+
const buf = packet1.toBuffer(5);
32+
const decoder = new MqttDecoder();
33+
decoder.version = 5;
34+
decoder.push(buf);
35+
const packet2 = decoder.parse()! as PacketPublish;
36+
expect(packet2.b).toBe(packet1.b);
37+
expect(packet2.qualityOfService()).toBe(1);
38+
expect(packet2.t).toBe('topic');
39+
expect(packet2.p).toEqual({});
40+
expect(packet2.d).toEqual(Buffer.from([1, 2, 3]));
41+
expect(packet2).toEqual(packet1);
42+
});
43+
44+
test('can encode properties', () => {
45+
const packet1 = PacketPublish.create('topic', 1, {
46+
[PROPERTY.AssignedClientIdentifier]: 'test',
47+
[PROPERTY.UserProperty]: [
48+
['test', 'test'],
49+
],
50+
}, Buffer.from([1, 2, 3]));
51+
packet1.setQualityOfService(1);
52+
const buf = packet1.toBuffer(5);
53+
const decoder = new MqttDecoder();
54+
decoder.version = 5;
55+
decoder.push(buf);
56+
const packet2 = decoder.parse()! as PacketPublish;
57+
expect(packet2.b).toBe(packet1.b);
58+
expect(packet2.qualityOfService()).toBe(1);
59+
expect(packet2.t).toBe('topic');
60+
expect(packet2.p).toEqual({
61+
[PROPERTY.AssignedClientIdentifier]: 'test',
62+
[PROPERTY.UserProperty]: [
63+
['test', 'test'],
64+
],
65+
});
66+
expect(packet2.d).toEqual(Buffer.from([1, 2, 3]));
67+
expect(packet2).toEqual(packet1);
68+
});

‎src/packets/publish.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { PACKET_TYPE } from '../enums';
12
import {Packet, PacketHeaderData} from '../packet';
23
import {Properties} from '../types';
4+
import { genProps } from '../util/genProps/v7';
35

46
export interface PacketPublishData extends PacketHeaderData {
57
/** Topic Name. */
@@ -13,6 +15,20 @@ export interface PacketPublishData extends PacketHeaderData {
1315
}
1416

1517
export class PacketPublish extends Packet implements PacketPublishData {
18+
/**
19+
* @param t Topic
20+
* @param i Packet ID
21+
* @param p Properties
22+
* @param d Payload
23+
*/
24+
static create(
25+
t: string,
26+
i: number,
27+
p: Properties,
28+
d: Buffer,): PacketPublish {
29+
return new PacketPublish(PACKET_TYPE.PUBLISH << 4, 0, t, i, p, d);
30+
}
31+
1632
constructor(
1733
b: number,
1834
l: number,
@@ -23,4 +39,67 @@ export class PacketPublish extends Packet implements PacketPublishData {
2339
) {
2440
super(b, l);
2541
}
42+
43+
public toBuffer(version: number): Buffer {
44+
const payload = this.d;
45+
const lenTopic = Buffer.byteLength(this.t);
46+
const isQosHigh = this.qualityOfService() > 0;
47+
const emitProps = version === 5;
48+
const props = emitProps ? genProps(this.p) : null;
49+
const propsLength = emitProps ? props!.length : 0;
50+
const remainingLength: number =
51+
2 + lenTopic + // topic length
52+
(isQosHigh ? 2 : 0) + // packet ID
53+
propsLength + // properties
54+
payload.length; // payload length
55+
const remainingLengthSize = remainingLength < 128 ? 1 : remainingLength < 16_384 ? 2 : remainingLength < 2_097_152 ? 3 : 4;
56+
const bufferLength = 1 + remainingLengthSize + remainingLength;
57+
const buf = Buffer.allocUnsafe(bufferLength);
58+
this.l = remainingLength;
59+
60+
buf.writeUInt8(this.b);
61+
62+
let offset = 1;
63+
64+
switch (remainingLengthSize) {
65+
case 1:
66+
buf.writeUInt8(remainingLength, 1);
67+
offset = 2;
68+
break;
69+
case 2:
70+
buf.writeUInt16LE(((remainingLength & 0b011111110000000) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
71+
offset = 3;
72+
break;
73+
case 3:
74+
buf.writeUInt16LE(((0b100000000000000 | (remainingLength & 0b011111110000000)) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
75+
buf.writeUInt8((remainingLength >> 14) & 0b01111111, 3);
76+
offset = 4;
77+
break;
78+
case 4:
79+
buf.writeUInt32LE((((((remainingLength >> 21) & 0b01111111) << 8) | (0b10000000 | ((remainingLength >> 14) & 0b01111111))) << 16) |
80+
((0b100000000000000 | (remainingLength & 0b011111110000000)) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
81+
offset = 5;
82+
break;
83+
}
84+
85+
buf.writeUInt16BE(lenTopic, offset);
86+
offset += 2;
87+
buf.write(this.t, offset);
88+
offset += lenTopic;
89+
90+
if (isQosHigh) {
91+
buf.writeUInt16BE(this.i, offset);
92+
offset += 2;
93+
}
94+
95+
if (emitProps) {
96+
props!.copy(buf, offset);
97+
offset += propsLength;
98+
}
99+
100+
payload.copy(buf, offset);
101+
// offset += payload.length;
102+
103+
return buf;
104+
}
26105
}

0 commit comments

Comments
 (0)