Skip to content

Commit 0755cc4

Browse files
committed
Initial version of multi-threaded writers.
* Create iperf_send_mt and iperf_recv_mt, the multi-threaded versions of network I/O functions. These handle a single connection only and do not attempt to coordinate timing (for flow control) with any other threads. * Make client and server thread functions call the new multi-threaded network I/O functions. * Remove all network I/O for the test streams from the main thread. * fd_set objects in test object only apply to sockets used by the main thread (not the test streams in the worker threads). Outstanding issues: * No locking of shared data structures at this point. Correctness may be compromised at this point. * Worker threads on the sender side will tend to busy-wait because they do not attempt to sleep while attempting to pace themeselves. * No support (for now) for ending conditions other than time-based (packet-based and byte-based don't work).
1 parent fc86f85 commit 0755cc4

File tree

4 files changed

+48
-101
lines changed

4 files changed

+48
-101
lines changed

src/iperf_api.c

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1861,10 +1861,8 @@ iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP)
18611861
bits_per_second = sp->result->bytes_sent * 8 / seconds;
18621862
if (bits_per_second < sp->test->settings->rate) {
18631863
sp->green_light = 1;
1864-
FD_SET(sp->socket, &sp->test->write_set);
18651864
} else {
18661865
sp->green_light = 0;
1867-
FD_CLR(sp->socket, &sp->test->write_set);
18681866
}
18691867
}
18701868

@@ -1909,10 +1907,10 @@ iperf_check_total_rate(struct iperf_test *test, iperf_size_t last_interval_bytes
19091907
}
19101908

19111909
int
1912-
iperf_send(struct iperf_test *test, fd_set *write_setP)
1910+
iperf_send_mt(struct iperf_stream *sp)
19131911
{
19141912
register int multisend, r, streams_active;
1915-
register struct iperf_stream *sp;
1913+
register struct iperf_test *test = sp->test;
19161914
struct iperf_time now;
19171915
int no_throttle_check;
19181916

@@ -1931,13 +1929,14 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
19311929
if (no_throttle_check)
19321930
iperf_time_now(&now);
19331931
streams_active = 0;
1934-
SLIST_FOREACH(sp, &test->streams, streams) {
1935-
if ((sp->green_light && sp->sender &&
1936-
(write_setP == NULL || FD_ISSET(sp->socket, write_setP)))) {
1937-
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
1938-
break;
1939-
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
1940-
break;
1932+
{
1933+
if (sp->green_light && sp->sender) {
1934+
// XXX If we hit one of these ending conditions maybe
1935+
// want to stop even trying to send something?
1936+
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
1937+
break;
1938+
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
1939+
break;
19411940
if ((r = sp->snd(sp)) < 0) {
19421941
if (r == NET_SOFTERROR)
19431942
break;
@@ -1957,35 +1956,24 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
19571956
}
19581957
if (!no_throttle_check) { /* Throttle check if was not checked for each send */
19591958
iperf_time_now(&now);
1960-
SLIST_FOREACH(sp, &test->streams, streams)
1961-
if (sp->sender)
1962-
iperf_check_throttle(sp, &now);
1959+
if (sp->sender)
1960+
iperf_check_throttle(sp, &now);
19631961
}
1964-
if (write_setP != NULL)
1965-
SLIST_FOREACH(sp, &test->streams, streams)
1966-
if (FD_ISSET(sp->socket, write_setP))
1967-
FD_CLR(sp->socket, write_setP);
1968-
19691962
return 0;
19701963
}
19711964

19721965
int
1973-
iperf_recv(struct iperf_test *test, fd_set *read_setP)
1966+
iperf_recv_mt(struct iperf_stream *sp)
19741967
{
19751968
int r;
1976-
struct iperf_stream *sp;
1969+
struct iperf_test *test = sp->test;
19771970

1978-
SLIST_FOREACH(sp, &test->streams, streams) {
1979-
if (FD_ISSET(sp->socket, read_setP) && !sp->sender) {
19801971
if ((r = sp->rcv(sp)) < 0) {
19811972
i_errno = IESTREAMREAD;
19821973
return r;
19831974
}
19841975
test->bytes_received += r;
19851976
++test->blocks_received;
1986-
FD_CLR(sp->socket, read_setP);
1987-
}
1988-
}
19891977

19901978
return 0;
19911979
}

src/iperf_api.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,8 @@ void build_tcpinfo_message(struct iperf_interval_results *r, char *message);
310310

311311
int iperf_set_send_state(struct iperf_test *test, signed char state);
312312
void iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP);
313-
int iperf_send(struct iperf_test *, fd_set *) /* __attribute__((hot)) */;
314-
int iperf_recv(struct iperf_test *, fd_set *);
313+
int iperf_send_mt(struct iperf_stream *) /* __attribute__((hot)) */;
314+
int iperf_recv_mt(struct iperf_stream *);
315315
void iperf_catch_sigend(void (*handler)(int));
316316
void iperf_got_sigend(struct iperf_test *test) __attribute__ ((noreturn));
317317
void usage(void);

src/iperf_client_api.c

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,28 @@
5252
#endif /* HAVE_TCP_CONGESTION */
5353

5454
void *
55-
iperf_client_worker_start(void *s) {
55+
iperf_client_worker_run(void *s) {
5656
struct iperf_stream *sp = (struct iperf_stream *) s;
5757
struct iperf_test *test = sp->test;
5858

5959
while (! (test->done)) {
60-
if (test->debug_level >= DEBUG_LEVEL_INFO) {
61-
iperf_printf(test, "Thread FD %d\n", sp->socket);
60+
if (sp->sender) {
61+
if (iperf_send_mt(sp) < 0) {
62+
goto cleanup_and_fail;
63+
}
64+
}
65+
else {
66+
if (iperf_recv_mt(sp) < 0) {
67+
goto cleanup_and_fail;
68+
}
6269
}
63-
sleep(1);
6470
}
6571
return NULL;
72+
73+
cleanup_and_fail:
74+
/* XXX */
75+
test->done = 0;
76+
return NULL;
6677
}
6778

6879
int
@@ -129,12 +140,6 @@ iperf_create_streams(struct iperf_test *test, int sender)
129140
}
130141
#endif /* HAVE_TCP_CONGESTION */
131142

132-
if (sender)
133-
FD_SET(s, &test->write_set);
134-
else
135-
FD_SET(s, &test->read_set);
136-
if (s > test->max_fd) test->max_fd = s;
137-
138143
sp = iperf_new_stream(test, s, sender);
139144
if (!sp)
140145
return -1;
@@ -643,7 +648,7 @@ iperf_run_client(struct iperf_test * test)
643648
}
644649

645650
SLIST_FOREACH(sp, &test->streams, streams) {
646-
if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_start, sp) != 0) {
651+
if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) {
647652
i_errno = IEPTHREADCREATE;
648653
goto cleanup_and_fail;
649654
}
@@ -667,24 +672,6 @@ iperf_run_client(struct iperf_test * test)
667672
}
668673
}
669674

670-
671-
if (test->mode == BIDIRECTIONAL)
672-
{
673-
if (iperf_send(test, &write_set) < 0)
674-
goto cleanup_and_fail;
675-
if (iperf_recv(test, &read_set) < 0)
676-
goto cleanup_and_fail;
677-
} else if (test->mode == SENDER) {
678-
// Regular mode. Client sends.
679-
if (iperf_send(test, &write_set) < 0)
680-
goto cleanup_and_fail;
681-
} else {
682-
// Reverse mode. Client receives.
683-
if (iperf_recv(test, &read_set) < 0)
684-
goto cleanup_and_fail;
685-
}
686-
687-
688675
/* Run the timers. */
689676
iperf_time_now(&now);
690677
tmr_run(&now);
@@ -736,15 +723,6 @@ iperf_run_client(struct iperf_test * test)
736723
goto cleanup_and_fail;
737724
}
738725
}
739-
// If we're in reverse mode, continue draining the data
740-
// connection(s) even if test is over. This prevents a
741-
// deadlock where the server side fills up its pipe(s)
742-
// and gets blocked, so it can't receive state changes
743-
// from the client side.
744-
else if (test->mode == RECEIVER && test->state == TEST_END) {
745-
if (iperf_recv(test, &read_set) < 0)
746-
goto cleanup_and_fail;
747-
}
748726
}
749727

750728
/* Cancel receiver threads */

src/iperf_server_api.c

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,28 @@
6767
#endif /* HAVE_TCP_CONGESTION */
6868

6969
void *
70-
iperf_server_worker_start(void *s) {
70+
iperf_server_worker_run(void *s) {
7171
struct iperf_stream *sp = (struct iperf_stream *) s;
7272
struct iperf_test *test = sp->test;
7373

7474
while (! (test->done)) {
75-
if (test->debug_level >= DEBUG_LEVEL_INFO) {
76-
iperf_printf(test, "Thread FD %d\n", sp->socket);
75+
if (sp->sender) {
76+
if (iperf_send_mt(sp) < 0) {
77+
goto cleanup_and_fail;
78+
}
79+
}
80+
else {
81+
if (iperf_recv_mt(sp) < 0) {
82+
goto cleanup_and_fail;
83+
}
7784
}
78-
sleep(1);
7985
}
8086
return NULL;
87+
88+
cleanup_and_fail:
89+
/* XXX */
90+
test->done = 0;
91+
return NULL;
8192
}
8293

8394
int
@@ -746,11 +757,6 @@ iperf_run_server(struct iperf_test *test)
746757
return -1;
747758
}
748759

749-
if (sp->sender)
750-
FD_SET(s, &test->write_set);
751-
else
752-
FD_SET(s, &test->read_set);
753-
754760
if (s > test->max_fd) test->max_fd = s;
755761

756762
/*
@@ -844,7 +850,7 @@ iperf_run_server(struct iperf_test *test)
844850
};
845851

846852
SLIST_FOREACH(sp, &test->streams, streams) {
847-
if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_start, sp) != 0) {
853+
if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) {
848854
i_errno = IEPTHREADCREATE;
849855
cleanup_server(test);
850856
return -1;
@@ -862,31 +868,6 @@ iperf_run_server(struct iperf_test *test)
862868
};
863869
}
864870
}
865-
866-
if (test->state == TEST_RUNNING) {
867-
if (test->mode == BIDIRECTIONAL) {
868-
if (iperf_recv(test, &read_set) < 0) {
869-
cleanup_server(test);
870-
return -1;
871-
}
872-
if (iperf_send(test, &write_set) < 0) {
873-
cleanup_server(test);
874-
return -1;
875-
}
876-
} else if (test->mode == SENDER) {
877-
// Reverse mode. Server sends.
878-
if (iperf_send(test, &write_set) < 0) {
879-
cleanup_server(test);
880-
return -1;
881-
}
882-
} else {
883-
// Regular mode. Server receives.
884-
if (iperf_recv(test, &read_set) < 0) {
885-
cleanup_server(test);
886-
return -1;
887-
}
888-
}
889-
}
890871
}
891872

892873
if (result == 0 ||

0 commit comments

Comments
 (0)