@@ -2245,102 +2245,88 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
22452245 ASSERT_EQ (consumer.getSubscriptionName (), subName);
22462246 LOG_INFO (" created pattern consumer with not match topics at beginning" );
22472247
2248+ auto createProducer = [&client](Producer &producer, const std::string &topic, int numPartitions) {
2249+ if (numPartitions > 0 ) {
2250+ const std::string url = adminUrl + " admin/v2/persistent/public/default/" + topic + " /partitions" ;
2251+ int res = makePutRequest (url, std::to_string (numPartitions));
2252+ ASSERT_TRUE (res == 204 || res == 409 );
2253+ }
2254+
2255+ const std::string fullTopicName = " persistent://public/default/" + topic;
2256+ Result result = client.createProducer (fullTopicName, producer);
2257+ ASSERT_EQ (ResultOk, result);
2258+ };
2259+
22482260 // 2. create 4 topics, in which 3 match the pattern.
2249- std::string topicName1 = " persistent://public/default/patternTopicsAutoConsumerPubSub1" ;
2250- std::string topicName2 = " persistent://public/default/patternTopicsAutoConsumerPubSub2" ;
2251- std::string topicName3 = " persistent://public/default/patternTopicsAutoConsumerPubSub3" ;
2261+ std::vector<Producer> producers (4 );
2262+ createProducer (producers[0 ], " patternTopicsAutoConsumerPubSub1" , 2 );
2263+ createProducer (producers[1 ], " patternTopicsAutoConsumerPubSub2" , 3 );
2264+ createProducer (producers[2 ], " patternTopicsAutoConsumerPubSub3" , 4 );
22522265 // This will not match pattern
2253- std::string topicName4 = " persistent://public/default/ notMatchPatternTopicsAutoConsumerPubSub4" ;
2266+ createProducer (producers[ 3 ], " notMatchPatternTopicsAutoConsumerPubSub4" , 4 ) ;
22542267
2255- // call admin api to make topics partitioned
2256- std::string url1 =
2257- adminUrl + " admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub1/partitions" ;
2258- std::string url2 =
2259- adminUrl + " admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub2/partitions" ;
2260- std::string url3 =
2261- adminUrl + " admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub3/partitions" ;
2262- std::string url4 =
2263- adminUrl + " admin/v2/persistent/public/default/notMatchPatternTopicsAutoConsumerPubSub4/partitions" ;
2268+ constexpr int messageNumber = 100 ;
22642269
2265- int res = makePutRequest (url1, " 2" );
2266- ASSERT_FALSE (res != 204 && res != 409 );
2267- res = makePutRequest (url2, " 3" );
2268- ASSERT_FALSE (res != 204 && res != 409 );
2269- res = makePutRequest (url3, " 4" );
2270- ASSERT_FALSE (res != 204 && res != 409 );
2271- res = makePutRequest (url4, " 4" );
2272- ASSERT_FALSE (res != 204 && res != 409 );
2270+ std::thread consumeThread ([&consumer] {
2271+ LOG_INFO (" Consuming and acking 300 messages by pattern topics consumer" );
2272+ for (int i = 0 ; i < 3 * messageNumber; i++) {
2273+ Message m;
2274+ // Ensure new topics can be discovered when the consumer is blocked by receive(Message&, int)
2275+ ASSERT_EQ (ResultOk, consumer.receive (m, 30000 ));
2276+ ASSERT_EQ (ResultOk, consumer.acknowledge (m));
2277+ }
2278+ // 5. pattern consumer already subscribed 3 topics
2279+ LOG_INFO (" Consumed and acked 300 messages by pattern topics consumer" );
22732280
2274- Producer producer1;
2275- result = client.createProducer (topicName1, producer1);
2276- ASSERT_EQ (ResultOk, result);
2277- Producer producer2;
2278- result = client.createProducer (topicName2, producer2);
2279- ASSERT_EQ (ResultOk, result);
2280- Producer producer3;
2281- result = client.createProducer (topicName3, producer3);
2282- ASSERT_EQ (ResultOk, result);
2283- Producer producer4;
2284- result = client.createProducer (topicName4, producer4);
2285- ASSERT_EQ (ResultOk, result);
2286- LOG_INFO (" created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match" );
2281+ // verify no more to receive, because producers[3] not match pattern
2282+ Message m;
2283+ ASSERT_EQ (ResultTimeout, consumer.receive (m, 1000 ));
2284+ });
22872285
22882286 // 3. wait enough time to trigger auto discovery
2289- std::this_thread::sleep_for (std::chrono::microseconds ( 2 * 1000 * 1000 ));
2287+ std::this_thread::sleep_for (std::chrono::seconds ( 2 ));
22902288
22912289 // 4. produce data.
2292- int messageNumber = 100 ;
2293- std::string msgContent = " msg-content" ;
2294- LOG_INFO (" Publishing 100 messages by producer 1 synchronously" );
2295- for (int msgNum = 0 ; msgNum < messageNumber; msgNum++) {
2296- std::stringstream stream;
2297- stream << msgContent << msgNum;
2298- Message msg = MessageBuilder ().setContent (stream.str ()).build ();
2299- ASSERT_EQ (ResultOk, producer1.send (msg));
2300- }
2301-
2302- msgContent = " msg-content2" ;
2303- LOG_INFO (" Publishing 100 messages by producer 2 synchronously" );
2304- for (int msgNum = 0 ; msgNum < messageNumber; msgNum++) {
2305- std::stringstream stream;
2306- stream << msgContent << msgNum;
2307- Message msg = MessageBuilder ().setContent (stream.str ()).build ();
2308- ASSERT_EQ (ResultOk, producer2.send (msg));
2290+ for (size_t i = 0 ; i < producers.size (); i++) {
2291+ const std::string msgContent = " msg-content" + std::to_string (i);
2292+ LOG_INFO (" Publishing " << messageNumber << " messages by producer " << i << " synchronously" );
2293+ for (int j = 0 ; j < messageNumber; j++) {
2294+ Message msg = MessageBuilder ().setContent (msgContent).build ();
2295+ ASSERT_EQ (ResultOk, producers[i].send (msg));
2296+ }
23092297 }
23102298
2311- msgContent = " msg-content3" ;
2312- LOG_INFO (" Publishing 100 messages by producer 3 synchronously" );
2313- for (int msgNum = 0 ; msgNum < messageNumber; msgNum++) {
2314- std::stringstream stream;
2315- stream << msgContent << msgNum;
2316- Message msg = MessageBuilder ().setContent (stream.str ()).build ();
2317- ASSERT_EQ (ResultOk, producer3.send (msg));
2318- }
2299+ consumeThread.join ();
23192300
2320- msgContent = " msg-content4" ;
2321- LOG_INFO (" Publishing 100 messages by producer 4 synchronously" );
2322- for (int msgNum = 0 ; msgNum < messageNumber; msgNum++) {
2323- std::stringstream stream;
2324- stream << msgContent << msgNum;
2325- Message msg = MessageBuilder ().setContent (stream.str ()).build ();
2326- ASSERT_EQ (ResultOk, producer4.send (msg));
2327- }
2301+ consumeThread = std::thread ([&consumer] {
2302+ LOG_INFO (" Consuming and acking 100 messages by pattern topics consumer" );
2303+ for (int i = 0 ; i < messageNumber; i++) {
2304+ Message m;
2305+ // Ensure new topics can be discovered when the consumer is blocked by receive(Message&)
2306+ ASSERT_EQ (ResultOk, consumer.receive (m));
2307+ ASSERT_EQ (ResultOk, consumer.acknowledge (m));
2308+ }
2309+ // 9. pattern consumer subscribed a new topic
2310+ LOG_INFO (" Consumed and acked 100 messages by pattern topics consumer" );
23282311
2329- // 5. pattern consumer already subscribed 3 topics
2330- LOG_INFO (" Consuming and acking 300 messages by pattern topics consumer" );
2331- for (int i = 0 ; i < 3 * messageNumber; i++) {
2312+ // verify no more to receive
23322313 Message m;
2333- ASSERT_EQ (ResultOk , consumer.receive (m, 1000 ));
2334- ASSERT_EQ (ResultOk, consumer. acknowledge (m) );
2335- }
2336- LOG_INFO ( " Consumed and acked 300 messages by pattern topics consumer " );
2314+ ASSERT_EQ (ResultTimeout , consumer.receive (m, 1000 ));
2315+ } );
2316+ // 6. Create a producer to a new topic
2317+ createProducer (producers[ 0 ], " patternTopicsAutoConsumerPubSub5 " , 4 );
23372318
2338- // verify no more to receive, because producer4 not match pattern
2339- Message m;
2340- ASSERT_EQ (ResultTimeout, consumer.receive (m, 1000 ));
2319+ // 7. wait enough time to trigger auto discovery
2320+ std::this_thread::sleep_for (std::chrono::seconds (2 ));
23412321
2342- ASSERT_EQ (ResultOk, consumer.unsubscribe ());
2322+ // 8. produce data
2323+ for (int i = 0 ; i < messageNumber; i++) {
2324+ Message msg = MessageBuilder ().setContent (" msg-content-5" ).build ();
2325+ ASSERT_EQ (ResultOk, producers[0 ].send (msg));
2326+ }
23432327
2328+ consumeThread.join ();
2329+ ASSERT_EQ (ResultOk, consumer.unsubscribe ());
23442330 client.shutdown ();
23452331}
23462332
0 commit comments