Skip to content

Commit 909c06d

Browse files
NIFI-15025 Set 8 MB Buffer Size for RestLookupService Responses (#10355)
Signed-off-by: David Handermann <[email protected]>
1 parent 7d44d91 commit 909c06d

File tree

2 files changed

+175
-2
lines changed

2 files changed

+175
-2
lines changed

nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
211211
static final String MIME_TYPE_KEY = "mime.type";
212212
static final String BODY_KEY = "request.body";
213213
static final String METHOD_KEY = "request.method";
214-
214+
static final Integer INPUT_STREAM_BUFFER_SIZE = 8388608; // 8MB
215215
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
216216
URL,
217217
RECORD_READER,
@@ -383,7 +383,7 @@ public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, Stri
383383

384384
final Record record;
385385
try (final InputStream is = responseBody.byteStream();
386-
final InputStream bufferedIn = new BufferedInputStream(is)) {
386+
final InputStream bufferedIn = new BufferedInputStream(is, INPUT_STREAM_BUFFER_SIZE)) {
387387
record = handleResponse(bufferedIn, responseBody.contentLength(), context);
388388
}
389389

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.lookup;
18+
19+
import mockwebserver3.MockResponse;
20+
import mockwebserver3.MockWebServer;
21+
import org.apache.nifi.reporting.InitializationException;
22+
import org.apache.nifi.serialization.RecordReader;
23+
import org.apache.nifi.serialization.RecordReaderFactory;
24+
import org.apache.nifi.serialization.SimpleRecordSchema;
25+
import org.apache.nifi.serialization.record.MapRecord;
26+
import org.apache.nifi.serialization.record.Record;
27+
import org.apache.nifi.serialization.record.RecordField;
28+
import org.apache.nifi.serialization.record.RecordFieldType;
29+
import org.apache.nifi.serialization.record.RecordSchema;
30+
import org.apache.nifi.util.NoOpProcessor;
31+
import org.apache.nifi.util.TestRunner;
32+
import org.apache.nifi.util.TestRunners;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.Timeout;
37+
import org.junit.jupiter.api.extension.ExtendWith;
38+
import org.mockito.Answers;
39+
import org.mockito.Mock;
40+
import org.mockito.junit.jupiter.MockitoExtension;
41+
42+
import java.io.IOException;
43+
import java.io.InputStream;
44+
import java.util.LinkedHashMap;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Optional;
48+
49+
import static java.net.HttpURLConnection.HTTP_OK;
50+
import static org.junit.jupiter.api.Assertions.assertTrue;
51+
import static org.junit.jupiter.api.Assertions.assertThrows;
52+
import static org.mockito.ArgumentMatchers.any;
53+
import static org.mockito.ArgumentMatchers.anyLong;
54+
import static org.mockito.Mockito.when;
55+
56+
@Timeout(10)
57+
@ExtendWith(MockitoExtension.class)
58+
class TestRestLookupServiceMarkReset {
59+
60+
private MockWebServer mockWebServer;
61+
private RestLookupService restLookupService;
62+
63+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
64+
private RecordReaderFactory recordReaderFactory;
65+
66+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
67+
private RecordReader recordReader;
68+
69+
private static final String SERVICE_ID = RestLookupService.class.getSimpleName() + "MarkReset";
70+
private static final String READER_ID = RecordReaderFactory.class.getSimpleName() + "MarkReset";
71+
72+
@BeforeEach
73+
void setUp() throws IOException, InitializationException {
74+
mockWebServer = new MockWebServer();
75+
mockWebServer.start();
76+
77+
TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
78+
restLookupService = new RestLookupService();
79+
80+
when(recordReaderFactory.getIdentifier()).thenReturn(READER_ID);
81+
runner.addControllerService(READER_ID, recordReaderFactory);
82+
runner.addControllerService(SERVICE_ID, restLookupService);
83+
84+
final String url = mockWebServer.url("/markreset").toString();
85+
runner.setProperty(restLookupService, RestLookupService.URL, url);
86+
runner.setProperty(restLookupService, RestLookupService.RECORD_READER, READER_ID);
87+
runner.enableControllerService(restLookupService);
88+
}
89+
90+
@AfterEach
91+
void tearDown() throws IOException {
92+
mockWebServer.close();
93+
}
94+
95+
@Test
96+
void testReaderUsesMarkResetBeyondBufferedStreamLimit() throws Exception {
97+
// Large JSON body to exceed the default BufferedInputStream buffer (8192 bytes)
98+
final int size = 20_000;
99+
String sb = '{' + "\"data\":\"" +
100+
"a".repeat(size) +
101+
"\"}";
102+
103+
mockWebServer.enqueue(new MockResponse.Builder()
104+
.code(HTTP_OK)
105+
.body(sb)
106+
.build());
107+
108+
// When the RecordReaderFactory is asked to create a reader, simulate mark/reset misuse
109+
when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenAnswer(invocation -> {
110+
final InputStream in = invocation.getArgument(1);
111+
if (in.markSupported()) {
112+
in.mark(1); // tiny read limit
113+
final byte[] buffer = new byte[4096];
114+
long total = 0;
115+
int read;
116+
while ((read = in.read(buffer)) != -1) {
117+
total += read;
118+
if (total > 10_000) { // read beyond the default buffer 8192
119+
break;
120+
}
121+
}
122+
// This reset would have failed before the fix when using BufferedInputStream over network stream
123+
in.reset();
124+
}
125+
return recordReader;
126+
});
127+
128+
// Return a simple record at once
129+
final RecordSchema schema = new SimpleRecordSchema(List.of(new RecordField("ok", RecordFieldType.BOOLEAN.getDataType())));
130+
final Record firstRecord = new MapRecord(schema, Map.of("ok", true));
131+
when(recordReader.nextRecord()).thenReturn(firstRecord, (Record) null);
132+
133+
final Map<String, Object> coordinates = new LinkedHashMap<>();
134+
final Optional<Record> result = restLookupService.lookup(coordinates);
135+
assertTrue(result.isPresent(), "Expected record to be present when reader performs mark/reset beyond buffer limit");
136+
}
137+
138+
@Test
139+
void testLookupThrowsWhenMarkResetExceedsBufferSize() throws Exception {
140+
// Create a body significantly larger than the configured buffer size (8 MB)
141+
final int size = RestLookupService.INPUT_STREAM_BUFFER_SIZE + 50_000;
142+
final String body = '{' + "\"data\":\"" + "x".repeat(size) + "\"}";
143+
144+
mockWebServer.enqueue(new MockResponse.Builder()
145+
.code(HTTP_OK)
146+
.body(body)
147+
.build());
148+
149+
// Simulate a RecordReader that misuses mark/reset: sets a tiny read limit,
150+
// reads far beyond the BufferedInputStream capacity, then attempts reset.
151+
when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenAnswer(invocation -> {
152+
final InputStream in = invocation.getArgument(1);
153+
if (in.markSupported()) {
154+
in.mark(1); // Tiny read limit
155+
final byte[] buffer = new byte[8192];
156+
long total = 0;
157+
int read;
158+
while ((read = in.read(buffer)) != -1) {
159+
total += read;
160+
if (total > RestLookupService.INPUT_STREAM_BUFFER_SIZE + 10_000) {
161+
break;
162+
}
163+
}
164+
// This reset should fail since we read beyond the mark's readlimit and the buffer size
165+
in.reset();
166+
}
167+
return recordReader; // Not expected to be used due to exception
168+
});
169+
170+
// Verify that the lookup wraps the reset failure into LookupFailureException
171+
assertThrows(LookupFailureException.class, () -> restLookupService.lookup(new LinkedHashMap<>()));
172+
}
173+
}

0 commit comments

Comments
 (0)