Skip to content

Commit 40f51ce

Browse files
committed
use concurrent map to solve fatal error: concurrent map writes #8
1 parent 2664ce1 commit 40f51ce

File tree

10 files changed

+112
-168
lines changed

10 files changed

+112
-168
lines changed

cmd/dispose.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func init() {
2424
}
2525

2626
// Dispose is 处理函数
27-
func Dispose(concurrency, totalNumber uint64, request *pkg.Request) {
27+
func Dispose(concurrency int, totalNumber uint64, request *pkg.Request) {
2828

2929
// 设置接收数据缓存
3030
ch := make(chan *pkg.RequestResults, 1000)
@@ -36,7 +36,7 @@ func Dispose(concurrency, totalNumber uint64, request *pkg.Request) {
3636
wgReceiving.Add(1)
3737
go pkg.ReceivingResults(concurrency, ch, &wgReceiving)
3838

39-
for i := uint64(0); i < concurrency; i++ {
39+
for i := 0; i < concurrency; i++ {
4040
wg.Add(1)
4141
switch request.Form {
4242
case pkg.FormTypeHttp:
@@ -59,7 +59,7 @@ func Dispose(concurrency, totalNumber uint64, request *pkg.Request) {
5959
go pkg.WebSocket(i, ch, totalNumber, &wg, request, ws)
6060
case 2:
6161
// 并发建立长链接
62-
go func(i uint64) {
62+
go func(i int) {
6363
// 连接以后再启动协程
6464
ws := pkg.NewWebSocket(request.Url)
6565
err := ws.GetConn()

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func init() {
9898
// will be global for your application.
9999

100100
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file for stress (default is $HOME/.stress.yaml)")
101-
rootCmd.PersistentFlags().Uint64VarP(&pkg.Concurrency, "concurrency", "c", 1, "并发数")
101+
rootCmd.PersistentFlags().IntVarP(&pkg.Concurrency, "concurrency", "c", 1, "并发数")
102102
rootCmd.PersistentFlags().Uint64VarP(&pkg.Number, "number", "n", 1, "单协程的请求数")
103103
rootCmd.PersistentFlags().StringVarP(&pkg.RequestUrl, "requestUrl", "u", "", "curl文件路径")
104104
rootCmd.PersistentFlags().StringVarP(&pkg.Path, "path", "f", "", "read curl file to build test")

pkg/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
// Http is http go link
11-
func Http(chanId uint64, ch chan<- *RequestResults, totalNumber uint64, wg *sync.WaitGroup, request *Request) {
11+
func Http(chanId int, ch chan<- *RequestResults, totalNumber uint64, wg *sync.WaitGroup, request *Request) {
1212

1313
defer func() {
1414
wg.Done()

pkg/http_client.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"crypto/tls"
55
"fmt"
66
"github.com/oldthreefeng/stress/utils"
7-
"github.com/orcaman/concurrent-map"
87
"io"
98
"net/http"
109
"time"
@@ -16,7 +15,7 @@ import (
1615
// body 请求的body
1716
// headers 请求头信息
1817
// timeout 请求超时时间
19-
func HttpRequest(method, url string, body io.Reader, headers cmap.ConcurrentMap, timeout time.Duration) (resp *http.Response, requestTime uint64, err error) {
18+
func HttpRequest(method, url string, body io.Reader, headers utils.ConcurrentMap, timeout time.Duration) (resp *http.Response, requestTime uint64, err error) {
2019

2120
// 跳过证书验证
2221
tr := &http.Transport{
@@ -43,7 +42,7 @@ func HttpRequest(method, url string, body io.Reader, headers cmap.ConcurrentMap,
4342

4443
if _, ok := headers.Get("Content-Type"); !ok {
4544
if headers == nil {
46-
headers = cmap.New()
45+
headers = utils.New()
4746
}
4847
headers.Set("Content-Type","application/x-www-form-urlencoded; charset=utf-8")
4948
}

pkg/request.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package pkg
33
import (
44
"errors"
55
"fmt"
6-
"github.com/orcaman/concurrent-map"
6+
"github.com/oldthreefeng/stress/utils"
77
"io"
88
"net/http"
99
"strings"
@@ -68,7 +68,7 @@ type Request struct {
6868
Url string // Url
6969
Form string // http/webSocket/tcp
7070
Method string // 方法 GET/POST/PUT
71-
Headers cmap.ConcurrentMap // Headers
71+
Headers utils.ConcurrentMap // Headers
7272
Body string // body
7373
Verify string // 验证的方法
7474
Timeout time.Duration // 请求超时时间
@@ -121,7 +121,7 @@ func NewRequest(url string, verify string, timeout time.Duration, debug bool, re
121121

122122
var (
123123
method = DefaultMethod
124-
headers = cmap.New()
124+
headers = utils.New()
125125
body string
126126
)
127127

@@ -213,7 +213,7 @@ func NewDefaultRequest() *Request {
213213
}
214214
}
215215

216-
func getHeaderValue(v string, headers cmap.ConcurrentMap) {
216+
func getHeaderValue(v string, headers utils.ConcurrentMap) {
217217
index := strings.Index(v, ":")
218218
if index < 0 {
219219
return
@@ -273,14 +273,14 @@ func (r *Request) IsParameterLegal() (err error) {
273273
// RequestResults is 请求结果
274274
type RequestResults struct {
275275
Id string // 消息Id
276-
ChanId uint64 // 消息Id
276+
ChanId int // 消息Id
277277
Time uint64 // 请求时间 纳秒
278278
IsSucceed bool // 是否请求成功
279279
ErrCode int // 错误码
280280
}
281281

282282
// SetId is set chanId & id to request results
283-
func (r *RequestResults) SetId(chanId uint64, number uint64) {
283+
func (r *RequestResults) SetId(chanId int, number uint64) {
284284
id := fmt.Sprintf("%d_%d", chanId, number)
285285

286286
r.Id = id

pkg/statistics.go renamed to pkg/satics.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pkg
22

33
import (
44
"fmt"
5+
"github.com/oldthreefeng/stress/utils"
56
"strings"
67
"sync"
78
"time"
@@ -15,7 +16,7 @@ var (
1516
// ReceivingResults is 接收结果并处理
1617
// 统计的时间都是纳秒,显示的时间 都是毫秒
1718
// concurrent 并发数
18-
func ReceivingResults(concurrent uint64, ch <-chan *RequestResults, wg *sync.WaitGroup) {
19+
func ReceivingResults(concurrent int, ch <-chan *RequestResults, wg *sync.WaitGroup) {
1920

2021
defer func() {
2122
wg.Done()
@@ -34,13 +35,13 @@ func ReceivingResults(concurrent uint64, ch <-chan *RequestResults, wg *sync.Wai
3435
successNum uint64 // 成功处理数,code为0
3536
failureNum uint64 // 处理失败数,code不为0
3637
chanIdLen int // 并发数
37-
chanIds = make(map[uint64]bool)
38+
chanIds = utils.New(Concurrency)
3839
)
3940

4041
statTime := uint64(time.Now().UnixNano())
4142

4243
// 错误码/错误个数
43-
var errCode = make(map[int]int)
44+
var errCode = utils.New()
4445

4546
// 定时输出一次计算结果
4647
ticker := time.NewTicker(exportStatisticsTime)
@@ -83,16 +84,18 @@ func ReceivingResults(concurrent uint64, ch <-chan *RequestResults, wg *sync.Wai
8384
}
8485

8586
// 统计错误码
86-
if value, ok := errCode[data.ErrCode]; ok {
87-
errCode[data.ErrCode] = value + 1
87+
strErrCode := fmt.Sprintf("%d", data.ErrCode)
88+
if value, ok := errCode.Get(strErrCode); ok {
89+
errCode.Set(strErrCode, value.(int)+1)
8890
} else {
89-
errCode[data.ErrCode] = 1
91+
errCode.Set(strErrCode, 1)
9092
}
91-
92-
if _, ok := chanIds[data.ChanId]; !ok {
93-
chanIds[data.ChanId] = true
93+
strChanId := fmt.Sprintf("%d", data.ChanId)
94+
if _, ok := chanIds.Get(strChanId); !ok {
95+
chanIds.Set(strChanId, true)
9496
chanIdLen = len(chanIds)
9597
}
98+
9699
}
97100

98101
// 数据全部接受完成,停止定时输出统计数据
@@ -117,7 +120,7 @@ func ReceivingResults(concurrent uint64, ch <-chan *RequestResults, wg *sync.Wai
117120
}
118121

119122
// 计算数据
120-
func calculateData(concurrent, processingTime, requestTime, maxTime, minTime, successNum, failureNum uint64, chanIdLen int, errCode map[int]int) {
123+
func calculateData(concurrent int, processingTime, requestTime, maxTime, minTime, successNum, failureNum uint64, chanIdLen int, errCode utils.ConcurrentMap) {
121124
if processingTime == 0 {
122125
processingTime = 1
123126
}
@@ -132,12 +135,12 @@ func calculateData(concurrent, processingTime, requestTime, maxTime, minTime, su
132135

133136
// 平均 每个协程成功数*总协程数据/总耗时 (每秒)
134137
if processingTime != 0 {
135-
qps = float64(successNum*1e9*concurrent) / float64(processingTime)
138+
qps = float64(successNum*1e9*uint64(concurrent)) / float64(processingTime)
136139
}
137140

138141
// 平均时长 总耗时/总请求数/并发数 纳秒=>毫秒
139142
if successNum != 0 && concurrent != 0 {
140-
averageTime = float64(processingTime) / float64(successNum*1e6*concurrent)
143+
averageTime = float64(processingTime) / float64(successNum*1e6*uint64(concurrent))
141144
}
142145

143146
// 纳秒=>毫秒
@@ -166,7 +169,7 @@ func header() {
166169
}
167170

168171
// 打印表格
169-
func table(successNum, failureNum uint64, errCode map[int]int, qps, averageTime, maxTimeFloat, minTimeFloat, requestTimeFloat float64, chanIdLen int) {
172+
func table(successNum, failureNum uint64, errCode utils.ConcurrentMap, qps, averageTime, maxTimeFloat, minTimeFloat, requestTimeFloat float64, chanIdLen int) {
170173
// 打印的时长都为毫秒
171174
result := fmt.Sprintf("%4.0fs│%7d│%7d│%7d│%8.2f│%8.2f│%8.2f│%8.2f│%v", requestTimeFloat, chanIdLen, successNum, failureNum, qps, maxTimeFloat, minTimeFloat, averageTime, printMap(errCode))
172175
fmt.Println(result)
@@ -175,13 +178,13 @@ func table(successNum, failureNum uint64, errCode map[int]int, qps, averageTime,
175178
}
176179

177180
// 输出错误码、次数 节约字符(终端一行字符大小有限)
178-
func printMap(errCode map[int]int) (mapStr string) {
181+
func printMap(errCode utils.ConcurrentMap) (mapStr string) {
179182

180183
var (
181184
mapArr []string
182185
)
183-
for key, value := range errCode {
184-
mapArr = append(mapArr, fmt.Sprintf("%d:%d", key, value))
186+
for key, value := range errCode.Items() {
187+
mapArr = append(mapArr, fmt.Sprintf("%s:%d", key, value))
185188
}
186189

187190
mapStr = strings.Join(mapArr, ";")

pkg/var.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package pkg
22

33
var (
4-
Concurrency uint64 // Concurrency is 并发请求数
4+
Concurrency int // Concurrency is 并发请求数
55
Number uint64 // Number is 单个协程的请求总数
66
Debug bool // Debug is 是否开其调试模式
77
Path string // Path is curl 文件路径

pkg/websocket.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func init() {
2222
}
2323

2424
// WebSocket is web socket go link
25-
func WebSocket(chanId uint64, ch chan<- *RequestResults, totalNumber uint64, wg *sync.WaitGroup, request *Request, ws *WebSocketC) {
25+
func WebSocket(chanId int, ch chan<- *RequestResults, totalNumber uint64, wg *sync.WaitGroup, request *Request, ws *WebSocketC) {
2626

2727
defer func() {
2828
wg.Done()
@@ -69,7 +69,7 @@ end:
6969
}
7070

7171
// webSocketRequest is 请求
72-
func webSocketRequest(chanId uint64, ch chan<- *RequestResults, i uint64, request *Request, ws *WebSocketC) {
72+
func webSocketRequest(chanId int, ch chan<- *RequestResults, i uint64, request *Request, ws *WebSocketC) {
7373

7474
var (
7575
startTime = time.Now()

0 commit comments

Comments
 (0)