Skip to content

Commit fafec3f

Browse files
authored
Pipeliner expose queued commands (#3496)
* Pipeliner expose queued commands Signed-off-by: Xiaolong Chen <[email protected]> * add tests and update some comments Signed-off-by: Xiaolong Chen <[email protected]> --------- Signed-off-by: Xiaolong Chen <[email protected]>
1 parent 6b9cbe8 commit fafec3f

File tree

3 files changed

+37
-5
lines changed

3 files changed

+37
-5
lines changed

commands.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ var (
253253
_ Cmdable = (*Tx)(nil)
254254
_ Cmdable = (*Ring)(nil)
255255
_ Cmdable = (*ClusterClient)(nil)
256+
_ Cmdable = (*Pipeline)(nil)
256257
)
257258

258259
type cmdable func(ctx context.Context, cmd Cmder) error

pipeline.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
type pipelineExecer func(context.Context, []Cmder) error
99

10-
// Pipeliner is an mechanism to realise Redis Pipeline technique.
10+
// Pipeliner is a mechanism to realise Redis Pipeline technique.
1111
//
1212
// Pipelining is a technique to extremely speed up processing by packing
1313
// operations to batches, send them at once to Redis and read a replies in a
@@ -23,21 +23,24 @@ type pipelineExecer func(context.Context, []Cmder) error
2323
type Pipeliner interface {
2424
StatefulCmdable
2525

26-
// Len is to obtain the number of commands in the pipeline that have not yet been executed.
26+
// Len obtains the number of commands in the pipeline that have not yet been executed.
2727
Len() int
2828

2929
// Do is an API for executing any command.
3030
// If a certain Redis command is not yet supported, you can use Do to execute it.
3131
Do(ctx context.Context, args ...interface{}) *Cmd
3232

33-
// Process is to put the commands to be executed into the pipeline buffer.
33+
// Process puts the commands to be executed into the pipeline buffer.
3434
Process(ctx context.Context, cmd Cmder) error
3535

36-
// Discard is to discard all commands in the cache that have not yet been executed.
36+
// Discard discards all commands in the pipeline buffer that have not yet been executed.
3737
Discard()
3838

39-
// Exec is to send all the commands buffered in the pipeline to the redis-server.
39+
// Exec sends all the commands buffered in the pipeline to the redis server.
4040
Exec(ctx context.Context) ([]Cmder, error)
41+
42+
// Cmds returns the list of queued commands.
43+
Cmds() []Cmder
4144
}
4245

4346
var _ Pipeliner = (*Pipeline)(nil)
@@ -119,3 +122,7 @@ func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([
119122
func (c *Pipeline) TxPipeline() Pipeliner {
120123
return c
121124
}
125+
126+
func (c *Pipeline) Cmds() []Cmder {
127+
return c.cmds
128+
}

pipeline_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,30 @@ var _ = Describe("pipelining", func() {
3636
Expect(get.Val()).To(Equal(""))
3737
})
3838

39+
It("exports queued commands", func() {
40+
p := client.Pipeline()
41+
cmds := p.Cmds()
42+
Expect(cmds).To(BeEmpty())
43+
44+
p.Set(ctx, "foo", "bar", 0)
45+
p.Get(ctx, "foo")
46+
cmds = p.Cmds()
47+
Expect(cmds).To(HaveLen(p.Len()))
48+
Expect(cmds[0].Name()).To(Equal("set"))
49+
Expect(cmds[1].Name()).To(Equal("get"))
50+
51+
cmds, err := p.Exec(ctx)
52+
Expect(err).NotTo(HaveOccurred())
53+
Expect(cmds).To(HaveLen(2))
54+
Expect(cmds[0].Name()).To(Equal("set"))
55+
Expect(cmds[0].(*redis.StatusCmd).Val()).To(Equal("OK"))
56+
Expect(cmds[1].Name()).To(Equal("get"))
57+
Expect(cmds[1].(*redis.StringCmd).Val()).To(Equal("bar"))
58+
59+
cmds = p.Cmds()
60+
Expect(cmds).To(BeEmpty())
61+
})
62+
3963
assertPipeline := func() {
4064
It("returns no errors when there are no commands", func() {
4165
_, err := pipe.Exec(ctx)

0 commit comments

Comments
 (0)