Skip to content

Commit ca862d7

Browse files
committed
[net2] Add Socket.setTimeout
Still seeing crashes and performance problems.
1 parent 4635ed7 commit ca862d7

File tree

1 file changed

+191
-5
lines changed

1 file changed

+191
-5
lines changed

lib/net.js

Lines changed: 191 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,167 @@ var EINPROGRESS = process.EINPROGRESS;
3434
var ENOENT = process.ENOENT;
3535
var END_OF_FILE = 0;
3636

37+
38+
// IDLE TIMEOUTS
39+
//
40+
// Because often many sockets will have the same idle timeout we will not
41+
// use one timeout watcher per socket. It is too much overhead. Instead
42+
// we'll use a single watcher for all sockets with the same timeout value
43+
// and a linked list. This technique is described in the libev manual:
44+
// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
45+
46+
47+
var timeout = new (function () {
48+
// Object containing all lists, timers
49+
// key = time in milliseconds
50+
// value = list
51+
var lists = {};
52+
53+
// show the most idle socket
54+
function peek (list) {
55+
if (list._idlePrev == list) return null;
56+
return list._idlePrev;
57+
}
58+
59+
60+
// remove the most idle socket from the list
61+
function shift (list) {
62+
var first = list._idlePrev;
63+
remove(first);
64+
return first;
65+
}
66+
67+
68+
// remove a socket from its list
69+
function remove (socket) {
70+
socket._idleNext._idlePrev = socket._idlePrev;
71+
socket._idlePrev._idleNext = socket._idleNext;
72+
}
73+
74+
75+
// remove a socket from its list and place at the end.
76+
function append (list, socket) {
77+
remove(socket);
78+
socket._idleNext = list._idleNext;
79+
socket._idleNext._idlePrev = socket;
80+
socket._idlePrev = list
81+
list._idleNext = socket;
82+
}
83+
84+
85+
function normalize (msecs) {
86+
if (!msecs || msecs <= 0) return 0;
87+
// round up to one sec
88+
if (msecs < 1000) return 1000;
89+
// round down to nearest second.
90+
return msecs - (msecs % 1000);
91+
}
92+
93+
// the main function - creates lists on demand and the watchers associated
94+
// with them.
95+
function insert (socket, msecs) {
96+
socket._idleStart = process.now;
97+
socket._idleTimeout = msecs;
98+
99+
if (!msecs) return;
100+
101+
var list;
102+
103+
if (lists[msecs]) {
104+
list = lists[msecs];
105+
} else {
106+
list = new process.Timer();
107+
list._idleNext = list;
108+
list._idlePrev = list;
109+
110+
lists[msecs] = list;
111+
112+
list.callback = function () {
113+
sys.puts('timeout callback ' + msecs);
114+
// TODO - don't stop and start the watcher all the time.
115+
// just set its repeat
116+
var now = process.now;
117+
var first;
118+
while (first = peek(list)) {
119+
var diff = now - first._idleStart;
120+
if (diff < msecs) {
121+
list.again(msecs - diff);
122+
sys.puts(msecs + ' list wait');
123+
return;
124+
} else {
125+
remove(first);
126+
assert(first != peek(list));
127+
first.emit('timeout');
128+
first.forceClose(new Error('idle timeout'));
129+
}
130+
}
131+
sys.puts(msecs + ' list empty');
132+
assert(list._idleNext == list); // list is empty
133+
list.stop();
134+
};
135+
}
136+
137+
if (list._idleNext == list) {
138+
// if empty (re)start the timer
139+
list.again(msecs);
140+
}
141+
142+
append(list, socket);
143+
assert(list._idleNext != list); // list is not empty
144+
}
145+
146+
147+
var unenroll = this.unenroll = function (socket) {
148+
socket._idleNext._idlePrev = socket._idlePrev;
149+
socket._idlePrev._idleNext = socket._idleNext;
150+
151+
var list = lists[socket._idleTimeout];
152+
// if empty then stop the watcher
153+
//sys.puts('unenroll');
154+
if (list && list._idlePrev == list) {
155+
//sys.puts('unenroll: list empty');
156+
list.stop();
157+
}
158+
};
159+
160+
161+
// Does not start the time, just sets up the members needed.
162+
this.enroll = function (socket, msecs) {
163+
// if this socket was already in a list somewhere
164+
// then we should unenroll it from that
165+
if (socket._idleNext) unenroll(socket);
166+
167+
socket._idleTimeout = msecs;
168+
socket._idleNext = socket;
169+
socket._idlePrev = socket;
170+
};
171+
172+
// call this whenever the socket is active (not idle)
173+
// it will reset its timeout.
174+
this.active = function (socket) {
175+
var msecs = socket._idleTimeout;
176+
if (msecs) {
177+
var list = lists[msecs];
178+
if (socket._idleNext == socket) {
179+
insert(socket, msecs);
180+
} else {
181+
// inline append
182+
socket._idleStart = process.now;
183+
socket._idleNext._idlePrev = socket._idlePrev;
184+
socket._idlePrev._idleNext = socket._idleNext;
185+
socket._idleNext = list._idleNext;
186+
socket._idleNext._idlePrev = socket;
187+
socket._idlePrev = list
188+
list._idleNext = socket;
189+
}
190+
}
191+
};
192+
})();
193+
194+
195+
196+
197+
37198
// This is a free list to avoid creating so many of the same object.
38199

39200
function FreeList (name, max, constructor) {
@@ -43,36 +204,41 @@ function FreeList (name, max, constructor) {
43204
this.list = [];
44205
}
45206

207+
46208
FreeList.prototype.alloc = function () {
47209
//debug("alloc " + this.name + " " + this.list.length);
48210
return this.list.length ? this.list.shift()
49211
: this.constructor.apply(this, arguments);
50-
}
212+
};
213+
51214

52215
FreeList.prototype.free = function (obj) {
53216
//debug("free " + this.name + " " + this.list.length);
54217
if (this.list.length < this.max) {
55218
this.list.push(obj);
56219
}
57-
}
220+
};
58221

59222

60223
var ioWatchers = new FreeList("iowatcher", 100, function () {
61224
return new IOWatcher();
62225
});
63226

227+
64228
var nb = 0;
65229
var buffers = new FreeList("buffer", 100, function (l) {
66-
return new Buffer(500);
230+
return new Buffer(l);
67231
});
68232

233+
69234
// Allocated on demand.
70235
var recvBuffer = null;
71236
function allocRecvBuffer () {
72237
recvBuffer = new Buffer(40*1024);
73238
recvBuffer.used = 0;
74239
}
75240

241+
76242
function _doFlush () {
77243
var socket = this.socket;
78244
// Socket becomes writeable on connect() but don't flush if there's
@@ -90,6 +256,8 @@ function _doFlush () {
90256
}
91257

92258
function initSocket (self) {
259+
timeout.enroll(self, 60*1000); // default timeout, 60 seconds
260+
93261
self._readWatcher = ioWatchers.alloc();
94262
self._readWatcher.callback = function () {
95263
// If this is the first recv (recvBuffer doesn't exist) or we've used up
@@ -140,6 +308,9 @@ function initSocket (self) {
140308

141309
if (!self.writable) self.forceClose();
142310
} else if (bytesRead > 0) {
311+
312+
timeout.active(self);
313+
143314
var start = recvBuffer.used;
144315
var end = recvBuffer.used + bytesRead;
145316

@@ -215,6 +386,7 @@ exports.createConnection = function (port, host) {
215386

216387
Object.defineProperty(Socket.prototype, 'readyState', {
217388
get: function () {
389+
sys.error('readyState is depricated. Use stream.readable or stream.writable');
218390
if (this.readable && this.writable) {
219391
return 'open';
220392
} else if (this.readable && !this.writable){
@@ -396,6 +568,9 @@ Socket.prototype.flush = function () {
396568
return false;
397569
}
398570

571+
timeout.active(self);
572+
573+
399574
if (bytesWritten === null) {
400575
// EAGAIN
401576
debug('write EAGAIN');
@@ -459,6 +634,8 @@ Socket.prototype.connect = function () {
459634

460635
var self = this;
461636
if (self.fd) throw new Error('Socket already opened');
637+
638+
timeout.active(socket);
462639

463640
if (typeof(arguments[0]) == 'string') {
464641
self.fd = socket('unix');
@@ -477,28 +654,34 @@ Socket.prototype.connect = function () {
477654
}
478655
};
479656

480-
var xxx = 0;
481-
482657

483658
Socket.prototype.address = function () {
484659
return getsockname(this.fd);
485660
};
486661

662+
487663
Socket.prototype.setNoDelay = function (v) {
488664
if (this.type == 'tcp') setNoDelay(this.fd, v);
489665
};
490666

491667

668+
Socket.prototype.setTimeout = function (msecs) {
669+
timeout.enroll(this, msecs);
670+
};
671+
672+
492673
Socket.prototype.pause = function () {
493674
this._readWatcher.stop();
494675
};
495676

677+
496678
Socket.prototype.resume = function () {
497679
if (!this.fd) throw new Error('Cannot resume() closed Socket.');
498680
this._readWatcher.set(this.fd, true, false);
499681
this._readWatcher.start();
500682
};
501683

684+
502685
Socket.prototype.forceClose = function (exception) {
503686
// recvBuffer is shared between sockets, so don't need to free it here.
504687
var self = this;
@@ -521,6 +704,8 @@ Socket.prototype.forceClose = function (exception) {
521704
this._readWatcher = null;
522705
}
523706

707+
timeout.unenroll(this);
708+
524709
if (this.fd) {
525710
close(this.fd);
526711
debug('close ' + this.fd);
@@ -580,6 +765,7 @@ function Server (listener) {
580765
// The 'connect' event probably should be removed for server-side
581766
// sockets. It's redundent.
582767
peer.emit('connect');
768+
timeout.active(peer);
583769
}
584770
};
585771
}

0 commit comments

Comments
 (0)