Skip to content

Commit 8dce722

Browse files
authored
Merge pull request #121 from peiran18/collab-service
Collab service
2 parents 6effc8e + c00606b commit 8dce722

File tree

6 files changed

+71
-25
lines changed

6 files changed

+71
-25
lines changed

backend/collab-service/src/index.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import express from 'express';
44
import cors from 'cors';
55
import { createServer } from 'http';
66
import { Server } from 'socket.io';
7-
import { connectProducer, sendMessage } from './kafka/producer';
7+
import { connectProducer, sendCodeMessage, sendChatMessage } from './kafka/producer';
88
import { connectRoomConsumer } from './kafka/roomConsumer';
99
import { connectCodeConsumer } from './kafka/codeConsumer';
10-
import { connectChatConsumer } from './kafka/chatConsumer'; // Import the new chat consumer
11-
import { roomManager } from './models/room';
10+
import { connectChatConsumer } from './kafka/chatConsumer';
11+
import { roomManager } from './services/roomManager';
1212
import redis from './redisClient';
1313

1414
const app = express();
@@ -49,7 +49,7 @@ io.on('connection', (socket) => {
4949
return;
5050
}
5151

52-
await sendMessage('collab-code', { key: username, value: code });
52+
await sendCodeMessage('collab-code', { key: username, value: code });
5353
});
5454

5555
socket.on('get-room-details', async (roomId: string, callback) => {
@@ -82,6 +82,10 @@ io.on('connection', (socket) => {
8282
redis.hdel(socketsToUsersKey, socket.id);
8383
});
8484

85+
socket.on('disconnect-collab', async () => {
86+
handleDisconnect(socket.id);
87+
});
88+
8589
// New event handler for chat messages
8690
socket.on('chat-message', async (message: string) => {
8791
const username = await redis.hget(socketsToUsersKey, socket.id);
@@ -91,7 +95,7 @@ io.on('connection', (socket) => {
9195
}
9296

9397
// Send the chat message to Kafka
94-
await sendMessage('collab-chat', { key: username, value: message });
98+
await sendChatMessage('collab-chat', { key: username, value: message });
9599
});
96100
});
97101

backend/collab-service/src/kafka/chatConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
2-
import { roomManager } from '../models/room';
2+
import { roomManager } from '../services/roomManager';
33
import redis from '../redisClient';
44

55
const kafka = new Kafka({

backend/collab-service/src/kafka/producer.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,26 @@ const kafka = new Kafka({
88
const producer: Producer = kafka.producer();
99

1010
export async function connectProducer(): Promise<void> {
11-
await producer.connect();
11+
await producer.connect();
1212
console.log('Producer connected');
1313
}
1414

15-
export async function sendMessage(topic: string, message: { key: string; value: any }): Promise<void> {
15+
export async function sendCodeMessage(topic: string, message: { key: string; value: any }): Promise<void> {
1616
const result = await producer.send({
1717
topic,
1818
messages: [{ key: message.key, value: JSON.stringify(message.value) }],
1919
});
2020
console.log(`Sent message: ${JSON.stringify(result)}`);
2121
}
2222

23+
export async function sendChatMessage(topic: string, message: { key: string; value: string }): Promise<void> {
24+
const result = await producer.send({
25+
topic,
26+
messages: [{ key: message.key, value: message.value }],
27+
});
28+
console.log(`Sent chat message: ${JSON.stringify(result)}`);
29+
}
30+
2331
export async function disconnectProducer(): Promise<void> {
2432
await producer.disconnect();
2533
console.log('Producer disconnected');

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ services:
4040
- "8888:8888"
4141
environment:
4242
KAFKA_BROKER: kafka:9092
43-
KAFKA_TOPIC: "collab-code,collab-room"
43+
KAFKA_TOPIC: "collab-code,collab-room,collab-chat"
4444
depends_on:
4545
- kafka
4646
networks:

frontend/src/pages/CollaborationPage.js

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ function CollaborationPage() {
1818
const isRemoteChange = useRef(false); // Flag to prevent infinite loop, true if change to editor is from remote (other user)
1919
const timeoutRef = useRef(null); // Timeout reference for the read-only state of the editor
2020
const countdownRef = useRef(null); // Timeout reference for the countdown when user gets kicked out
21+
const messagesEndRef = useRef(null); // Reference to the end of the chat messages
2122

2223
const [code, setCode] = useState('');
2324
const [question, setQuestion] = useState('');
@@ -29,6 +30,7 @@ function CollaborationPage() {
2930
const [userId, setUserId] = useState('');
3031
const [chatMessages, setChatMessages] = useState([]);
3132
const [newMessage, setNewMessage] = useState('');
33+
const [currentUsername, setCurrentUsername] = useState('');
3234

3335
function formatQuestion(question) {
3436
return `
@@ -50,6 +52,7 @@ ${question.questionDescription}
5052
const data = await verifyToken(token);
5153
const username = data.data.username;
5254
setUserId(data.data.id);
55+
setCurrentUsername(username);
5356

5457
console.log("Username:", username);
5558

@@ -110,11 +113,24 @@ ${question.questionDescription}
110113
});
111114

112115
// Listen for chat messages
113-
collabService.onChatMessage((data) => {
116+
const handleChatMessage = (data) => {
114117
setChatMessages((prevMessages) => [...prevMessages, { sender: data.sender, message: data.message }]);
115-
});
118+
};
119+
120+
collabService.onChatMessage(handleChatMessage);
121+
122+
return () => {
123+
collabService.offChatMessage(handleChatMessage);
124+
};
116125
}, []);
117126

127+
// Auto-scroll to the bottom when new messages are added
128+
useEffect(() => {
129+
if (messagesEndRef.current) {
130+
messagesEndRef.current.scrollIntoView({ behavior: 'smooth' });
131+
}
132+
}, [chatMessages]);
133+
118134
// Kick user out if other user disconnects
119135
useEffect(() => {
120136
collabService.onOtherUserDisconnect(() => {
@@ -215,19 +231,27 @@ ${question.questionDescription}
215231

216232
{/* Chatbox at the bottom */}
217233
<div className="h-1/4 w-full border-t border-gray-700 flex flex-col bg-gray-800">
218-
<div className="flex-grow overflow-y-auto p-3">
234+
<div className="flex-grow overflow-y-auto overflow-x-hidden p-3">
219235
{chatMessages.map((msg, index) => (
220-
<div key={index} className={`mb-2 ${msg.sender === 'You' ? 'text-right' : 'text-left'}`}>
221-
<span className="text-sm text-white font-semibold">{msg.sender}:</span>
222-
<p className="text-base text-white">{msg.message}</p>
236+
<div key={index} className={`flex flex-col mb-2 ${msg.sender === 'You' ? 'items-end' : 'items-start'}`}>
237+
<div className={`text-sm font-semibold text-white ${msg.sender === 'You' ? 'text-right' : 'text-left'}`}>
238+
{msg.sender}
239+
</div>
240+
<div
241+
className={`max-w-md p-2 rounded-lg break-words ${msg.sender === 'You' ? 'bg-blue-600 text-white text-right' : 'bg-gray-700 text-white'
242+
}`}
243+
>
244+
<p>{msg.message}</p>
245+
</div>
223246
</div>
224247
))}
248+
<div ref={messagesEndRef} />
225249
</div>
226250
<div className="p-3 border-t border-gray-700">
227251
<div className="flex">
228252
<input
229253
type="text"
230-
className="flex-grow bg-gray-900 text-white p-2 rounded-l-md focus:outline-none"
254+
className="input input-bordered w-full bg-white text-black rounded-l-full rounded-r-full"
231255
placeholder="Type a message..."
232256
value={newMessage}
233257
onChange={(e) => setNewMessage(e.target.value)}
@@ -238,7 +262,7 @@ ${question.questionDescription}
238262
}}
239263
/>
240264
<button
241-
className="bg-blue-600 text-white p-2 rounded-r-md"
265+
className="btn btn-primary ml-2 rounded-l-full rounded-r-full"
242266
onClick={handleSendMessage}
243267
>
244268
Send

frontend/src/services/collabService.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ import io from 'socket.io-client';
22

33
const socket = io('http://localhost:8888');
44

5+
const chatMessageCallbacks = new Set();
6+
7+
let chatListenerAdded = false; // Flag to prevent duplicate listeners
8+
59
const collabService = {
610
register: (username) => {
711
return new Promise((resolve, reject) => {
@@ -62,16 +66,22 @@ const collabService = {
6266
});
6367
},
6468

65-
// New method to send chat messages
66-
sendChatMessage: (message) => {
67-
socket.emit('chat-message', message);
69+
onChatMessage: (callback) => {
70+
if (!chatListenerAdded) {
71+
socket.on('chat-message', (data) => {
72+
chatMessageCallbacks.forEach((cb) => cb(data));
73+
});
74+
chatListenerAdded = true;
75+
}
76+
chatMessageCallbacks.add(callback); // Add to the Set
6877
},
6978

70-
// New method to receive chat messages
71-
onChatMessage: (callback) => {
72-
socket.on('chat-message', (data) => {
73-
callback(data);
74-
});
79+
offChatMessage: (callback) => {
80+
chatMessageCallbacks.delete(callback); // Remove specific callback
81+
},
82+
83+
sendChatMessage: (message) => {
84+
socket.emit('chat-message', message);
7585
},
7686
};
7787

0 commit comments

Comments
 (0)