-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
apache/pulsar-client-go
#823Closed
Copy link
Labels
Description
local docker-compose.yaml:
version: "3.7"
services:
pulsar:
image: apachepulsar/pulsar-all:2.9.3
command: bin/pulsar standalone
hostname: pulsar
ports:
- "8080:8080"
- "6650:6650"
restart: unless-stopped
volumes:
- "./data/:/pulsar/data"
dashboard:
image: apachepulsar/pulsar-manager:v0.3.0
ports:
- "9527:9527"
- "7750:7750"
depends_on:
- pulsar
links:
- pulsar
environment:
SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.propertiesgolangtest code:
func TestBytesSchema(t *testing.T) {
client := createClient()
defer client.Close()
producer1, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "bytesTopic",
Schema: pulsar.NewBytesSchema(nil),
})
assert.Nil(t, err)
if err != nil {
log.Fatal(err)
}
_, err = producer1.Send(context.Background(), &pulsar.ProducerMessage{
Value: []byte("test"),
})
if err != nil {
log.Fatal(err)
}
producer1.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "bytesTopic",
SubscriptionName: "sub-1",
Schema: pulsar.NewBytesSchema(nil),
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}The error find in code producer1.Send function.
Pulsar error log:
docker logs --tail=20 -f pulsarx_pulsar_1
2022-08-07T09:56:20,928+0000 [pulsar-io-29-7] WARN org.apache.pulsar.broker.service.ServerCnx - [/172.31.0.1:52376] Got exception java.lang.IllegalStateException: Some required fields are missing
at org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
at org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
at org.apache.pulsar.common.api.proto.CommandProducer.parseFrom(CommandProducer.java:511)
at org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2494)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:109)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)