Skip to content

Commit 667c578

Browse files
keith-turneralerman
authored andcommitted
WIP adapt DW PR#2568 to use accumulo PR#4898
These draft changes build on #2568 with the following differences. * Compute bulkv2 load plans using new unreleased APIs in accumulo PR 4898 * The table splits are loaded at the beginning of writing to rfiles instead of at the end. Not sure about the overall implications on on memory use in reducers of this change. The load plan could be computed after the rfile is closed using a new API in 4898 if defering the loading of tablet splits is desired. * Switches to using accumulo public APIs for writing rfiles instaead of internal accumulo methods. Well public once they are actually released. * The algorithm to compute the load plan does less work per key/value. Should be rougly constant time vs log(N). * Adds a simple SortedList class. This reason this was added is that this code does binary searches on list, however it was not certain those list were actually sorted. If the list was not sorted it would not cause exceptions in binary search but could lead to incorrect load plans and lost data. This new SortedList class ensures list are sorted and allows this assurance to travel around in the code. Maybe this change should be its own PR.
1 parent a2ab2b0 commit 667c578

File tree

10 files changed

+176
-282
lines changed

10 files changed

+176
-282
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
<sonar.exclusions>**/StandardLexer.java,**/*.js,**/*.css,**/*.html</sonar.exclusions>
4949
<sonar.sourceEncoding>UTF-8</sonar.sourceEncoding>
5050
<surefire.forkCount>1C</surefire.forkCount>
51-
<version.accumulo>2.1.2</version.accumulo>
51+
<version.accumulo>2.1.4-97e4684860</version.accumulo>
5252
<version.arquillian>1.4.1.Final</version.arquillian>
5353
<version.arquillian-weld-ee-embedded>1.0.0.Final</version.arquillian-weld-ee-embedded>
5454
<version.assertj>3.20.2</version.assertj>

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@
8989
*/
9090
public final class BulkIngestMapFileLoader implements Runnable {
9191
private static final Logger log = Logger.getLogger(BulkIngestMapFileLoader.class);
92-
private static final Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();
9392
private static int SLEEP_TIME = 30000;
9493
private static int FAILURE_SLEEP_TIME = 10 * 60 * 1000; // 10 minutes
9594
protected static int MAX_DIRECTORIES = 1;
@@ -1071,13 +1070,13 @@ private LoadPlan getLoadPlan() throws IOException {
10711070
in.readFully(0, buffer);
10721071
String s = new String(buffer, StandardCharsets.UTF_8);
10731072
// TODO: Use Gson streaming api instead to minimize impact on heap and cpu
1074-
builder.addPlan(gson.fromJson(s, LoadPlan.class));
1073+
builder.addPlan(LoadPlan.fromJson(s));
10751074
}
10761075
}
10771076
LoadPlan lp = builder.build();
10781077
log.debug("Completed deserializing load plan for " + tableDir);
10791078
if (log.isTraceEnabled()) {
1080-
log.trace("Consolidated LoadPlan for " + tableDir + ": " + gson.toJson(lp));
1079+
log.trace("Consolidated LoadPlan for " + tableDir + ": " + lp.toJson());
10811080
}
10821081
return lp;
10831082
}

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java

Lines changed: 41 additions & 224 deletions
Large diffs are not rendered by default.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package datawave.ingest.mapreduce.job;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.Comparator;
6+
import java.util.List;
7+
8+
import org.apache.log4j.Logger;
9+
10+
/**
11+
* Wraps a list that is immutable and verified as sorted.
12+
*/
13+
public class SortedList<T> {
14+
15+
private static final Logger log = Logger.getLogger(SortedList.class);
16+
17+
private final List<T> list;
18+
19+
private SortedList(List<T> list) {
20+
this.list = list;
21+
}
22+
23+
public List<T> get() {
24+
return list;
25+
}
26+
27+
private static final SortedList<?> EMPTY = new SortedList<>(List.of());
28+
29+
@SuppressWarnings("unchecked")
30+
public static <T2> SortedList<T2> empty() {
31+
return (SortedList<T2>) EMPTY;
32+
}
33+
34+
/**
35+
* For a list that is expected to be sorted this will verify it is sorted and if so return an immutable copy of it. If this list is not sorted it will log a
36+
* warning, copy it, sort the copy, and return an immutable version of the copy.
37+
*/
38+
public static <T2> SortedList<T2> fromSorted(List<T2> list) {
39+
if (list.isEmpty()) {
40+
return empty();
41+
}
42+
43+
var copy = List.copyOf(list);
44+
45+
// verify after copying because nothing can change at this point
46+
boolean isSorted = true;
47+
for (int i = 1; i < copy.size(); i++) {
48+
@SuppressWarnings("unchecked")
49+
var prev = (Comparable<? super T2>) copy.get(i - 1);
50+
if (prev.compareTo(copy.get(i)) > 0) {
51+
isSorted = false;
52+
}
53+
}
54+
55+
if (isSorted) {
56+
return new SortedList<>(copy);
57+
} else {
58+
log.warn("Input list of size " + copy.size() + " was expected to be sorted but was not", new IllegalArgumentException());
59+
return fromUnsorted(copy);
60+
}
61+
}
62+
63+
/**
64+
* Copies a list and sorts the copy returning an immutable version of the copy.
65+
*/
66+
public static <T2> SortedList<T2> fromUnsorted(List<T2> list) {
67+
if (list.isEmpty()) {
68+
return empty();
69+
}
70+
71+
var copy = new ArrayList<>(list);
72+
@SuppressWarnings("unchecked")
73+
var compartor = (Comparator<? super T2>) Comparator.naturalOrder();
74+
list.sort(compartor);
75+
return new SortedList<>(Collections.unmodifiableList(copy));
76+
}
77+
}

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private static boolean shardsAreBalanced(Map<Text,String> locations, String date
184184
return dateIsBalanced;
185185
}
186186

187-
public static Map<String,List<Text>> getSplits(Configuration conf) throws IOException {
187+
public static Map<String,SortedList<Text>> getSplits(Configuration conf) throws IOException {
188188
return TableSplitsCache.getCurrentCache(conf).getSplits();
189189

190190
}
@@ -193,7 +193,7 @@ public static Map<Text,String> getSplitsAndLocations(Configuration conf, String
193193
return TableSplitsCache.getCurrentCache(conf).getSplitsAndLocationByTable(tableName);
194194
}
195195

196-
public static List<Text> getSplits(Configuration conf, String tableName) throws IOException {
196+
public static SortedList<Text> getSplits(Configuration conf, String tableName) throws IOException {
197197
return TableSplitsCache.getCurrentCache(conf).getSplits(tableName);
198198
}
199199
}

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil {
6969

7070
private Path splitsPath = null;
7171
private Map<String,Map<Text,String>> splitLocations = new HashMap<>();
72-
private Map<String,List<Text>> splits = new HashMap<>();
72+
private Map<String,SortedList<Text>> splits = new HashMap<>();
7373

7474
private PartitionerCache partitionerCache;
7575

@@ -116,6 +116,7 @@ public static void clear() {
116116
* maximum number of splits to return
117117
* @return split points grouped into fewer evenly grouped splits so as not to exceed maxSplits
118118
*/
119+
// TODO seems like this code is only used in test
119120
public static List<Text> trimSplits(List<Text> tableSplits, int maxSplits) {
120121
if (tableSplits.size() <= maxSplits) {
121122
return tableSplits;
@@ -363,6 +364,7 @@ protected void readCache(BufferedReader in) throws IOException {
363364
String line;
364365
String tableName = null;
365366
Map tmpSplitLocations = new ShardLocationTrieMap();
367+
Map<String,List<Text>> tableSplits = new HashMap<>();
366368
List<Text> tmpSplits = null;
367369

368370
while ((line = in.readLine()) != null) {
@@ -374,7 +376,7 @@ protected void readCache(BufferedReader in) throws IOException {
374376
tableName = parts[0];
375377
tmpSplitLocations = new ShardLocationTrieMap();
376378
tmpSplits = new ArrayList<>();
377-
this.splits.put(tableName, Collections.unmodifiableList(tmpSplits));
379+
tableSplits.put(tableName, tmpSplits);
378380
}
379381
if (parts.length >= 2) {
380382
Text split = new Text(Base64.decodeBase64(parts[1]));
@@ -387,6 +389,11 @@ protected void readCache(BufferedReader in) throws IOException {
387389
if (!tmpSplitLocations.isEmpty()) {
388390
this.splitLocations.put(tableName, tmpSplitLocations);
389391
}
392+
393+
tableSplits.forEach((table, splits) -> {
394+
this.splits.put(table, SortedList.fromSorted(splits));
395+
});
396+
390397
in.close();
391398
}
392399

@@ -407,12 +414,12 @@ private String dedup(Map<String,String> dedupMap, String value) {
407414
* @throws IOException
408415
* for issues with read or write
409416
*/
410-
public List<Text> getSplits(String table) throws IOException {
417+
public SortedList<Text> getSplits(String table) throws IOException {
411418
if (this.splits.isEmpty()) {
412419
read();
413420
}
414-
List<Text> splitList = this.splits.get(table);
415-
return (splitList == null ? Collections.emptyList() : splitList);
421+
SortedList<Text> splitList = this.splits.get(table);
422+
return (splitList == null ? SortedList.empty() : splitList);
416423
}
417424

418425
/**
@@ -425,16 +432,17 @@ public List<Text> getSplits(String table) throws IOException {
425432
* @throws IOException
426433
* for issues with read or write
427434
*/
435+
// TODO seems like this code is only used in test, can it be removed? Looked into making it return SortedList
428436
public List<Text> getSplits(String table, int maxSplits) throws IOException {
429-
return trimSplits(getSplits(table), maxSplits);
437+
return trimSplits(getSplits(table).get(), maxSplits);
430438
}
431439

432440
/**
433441
* @return map of table name to list of splits for the table
434442
* @throws IOException
435443
* for issues with read or write
436444
*/
437-
public Map<String,List<Text>> getSplits() throws IOException {
445+
public Map<String,SortedList<Text>> getSplits() throws IOException {
438446
if (this.splits.isEmpty())
439447
read();
440448
return Collections.unmodifiableMap(splits);

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import datawave.ingest.mapreduce.handler.shard.ShardIdFactory;
2121
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;
2222
import datawave.ingest.mapreduce.job.BulkIngestKey;
23+
import datawave.ingest.mapreduce.job.SortedList;
2324
import datawave.ingest.mapreduce.job.SplitsFile;
2425
import datawave.util.time.DateHelper;
2526

@@ -131,7 +132,7 @@ private HashMap<Text,Integer> getPartitionsByShardId(String tableName) throws IO
131132
if (log.isDebugEnabled())
132133
log.debug("Loading splits data for " + tableName);
133134

134-
List<Text> sortedSplits = SplitsFile.getSplits(conf, tableName);
135+
SortedList<Text> sortedSplits = SplitsFile.getSplits(conf, tableName);
135136
Map<Text,String> shardIdToLocation = SplitsFile.getSplitsAndLocations(conf, tableName);
136137

137138
if (log.isDebugEnabled())
@@ -155,7 +156,7 @@ private HashMap<Text,Integer> getPartitionsByShardId(String tableName) throws IO
155156
* the map of shard ids and their location
156157
* @return shardId to
157158
*/
158-
private HashMap<Text,Integer> assignPartitionsForEachShard(List<Text> sortedShardIds, Map<Text,String> shardIdToLocations) {
159+
private HashMap<Text,Integer> assignPartitionsForEachShard(SortedList<Text> sortedShardIds, Map<Text,String> shardIdToLocations) {
159160
int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations);
160161

161162
HashMap<String,Integer> partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations);
@@ -175,11 +176,11 @@ private int calculateNumberOfUniqueTservers(Map<Text,String> shardIdToLocations)
175176
return totalNumUniqueTServers;
176177
}
177178

178-
private HashMap<String,Integer> getTServerAssignments(int totalNumTServers, List<Text> sortedShardIds, Map<Text,String> shardIdsToTservers) {
179+
private HashMap<String,Integer> getTServerAssignments(int totalNumTServers, SortedList<Text> sortedShardIds, Map<Text,String> shardIdsToTservers) {
179180
HashMap<String,Integer> partitionsByTServer = new HashMap<>(totalNumTServers);
180181
int nextAvailableSlot = 0;
181182
boolean alreadySkippedFutureShards = false;
182-
for (Text shard : sortedShardIds) {
183+
for (Text shard : sortedShardIds.get()) {
183184
if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance
184185
alreadySkippedFutureShards = true;
185186
String location = shardIdsToTservers.get(shard);

warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.log4j.Logger;
1717

1818
import datawave.ingest.mapreduce.job.BulkIngestKey;
19+
import datawave.ingest.mapreduce.job.SortedList;
1920
import datawave.ingest.mapreduce.job.SplitsFile;
2021

2122
/**
@@ -80,7 +81,7 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
8081

8182
String tableName = key.getTableName().toString();
8283

83-
List<Text> cutPointArray = null;
84+
SortedList<Text> cutPointArray = null;
8485
try {
8586
cutPointArray = SplitsFile.getSplits(conf, tableName);
8687
} catch (IOException e) {
@@ -90,8 +91,8 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
9091
return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions;
9192
}
9293
key.getKey().getRow(holder);
93-
int index = Collections.binarySearch(cutPointArray, holder);
94-
index = calculateIndex(index, numPartitions, tableName, cutPointArray.size());
94+
int index = Collections.binarySearch(cutPointArray.get(), holder);
95+
index = calculateIndex(index, numPartitions, tableName, cutPointArray.get().size());
9596

9697
index = partitionLimiter.limit(numPartitions, index);
9798

warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package datawave.ingest.mapreduce.job;
22

33
import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
4-
import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findKeyExtent;
4+
import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findContainingSplits;
55
import static org.junit.Assert.assertEquals;
66

77
import java.io.DataOutputStream;
@@ -47,7 +47,6 @@
4747

4848
import datawave.ingest.data.config.ingest.AccumuloHelper;
4949
import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
50-
import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.KeyExtent;
5150
import datawave.util.TableName;
5251

5352
public class MultiRFileOutputFormatterTest {
@@ -226,19 +225,20 @@ public void testPlanning() {
226225
rfileRows.add(new Text("20170601_9"));
227226
rfileRows.add(new Text("20200601_9"));
228227

229-
Set<KeyExtent> expectedExtents = new HashSet<>();
230-
expectedExtents.add(new KeyExtent(new Text("20170601_0"), new Text("20170601_1")));
231-
expectedExtents.add(new KeyExtent(new Text("20170601_8"), new Text("20170601_9")));
232-
expectedExtents.add(new KeyExtent(new Text("20170602_0"), new Text("20170602_1")));
233-
expectedExtents.add(new KeyExtent(new Text("20170603_9"), null));
234-
expectedExtents.add(new KeyExtent(new Text("20170603_0c"), new Text("20170603_1")));
235-
expectedExtents.add(new KeyExtent(null, new Text("20170601_0")));
236-
expectedExtents.add(new KeyExtent(new Text("20170602_9c"), new Text("20170603_0")));
237-
expectedExtents.add(new KeyExtent(new Text("20170603_0a"), new Text("20170603_0b")));
238-
expectedExtents.add(new KeyExtent(new Text("20170603_0b"), new Text("20170603_0c")));
228+
Set<LoadPlan.TableSplits> expectedExtents = new HashSet<>();
229+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_0"), new Text("20170601_1")));
230+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_8"), new Text("20170601_9")));
231+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_0"), new Text("20170602_1")));
232+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_9"), null));
233+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0c"), new Text("20170603_1")));
234+
expectedExtents.add(new LoadPlan.TableSplits(null, new Text("20170601_0")));
235+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_9c"), new Text("20170603_0")));
236+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0a"), new Text("20170603_0b")));
237+
expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0b"), new Text("20170603_0c")));
239238

240239
List<Text> tableSplits = getSplits();
241-
Set<KeyExtent> extents = rfileRows.stream().map(row -> findKeyExtent(row, tableSplits)).collect(Collectors.toCollection(HashSet::new));
240+
Set<LoadPlan.TableSplits> extents = rfileRows.stream().map(row -> findContainingSplits(row, tableSplits))
241+
.collect(Collectors.toCollection(HashSet::new));
242242

243243
assertEquals(expectedExtents, extents);
244244
}
@@ -470,37 +470,28 @@ protected Map<Text,String> getShardLocations(String tableName) throws IOExceptio
470470
}
471471

472472
@Override
473-
protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) {
473+
protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) {
474474
filenames.add(filename);
475-
return new SizeTrackingWriter(new FileSKVWriter() {
475+
return new SizeTrackingWriter(null) {
476+
public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
476477

477-
@Override
478-
public boolean supportsLocalityGroups() {
479-
return false;
480478
}
481479

482-
@Override
483-
public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {}
480+
public void startDefaultLocalityGroup() throws IOException {
484481

485-
@Override
486-
public void startDefaultLocalityGroup() throws IOException {}
482+
}
487483

488-
@Override
489-
public DataOutputStream createMetaStore(String name) throws IOException {
490-
return null;
484+
public void append(Key key, Value value) throws IOException {
485+
entries++;
486+
size += key.getLength() + (value == null ? 0 : value.getSize());
491487
}
492488

493-
@Override
494489
public void close() throws IOException {}
495490

496-
@Override
497-
public long getLength() throws IOException {
498-
return 0;
491+
public LoadPlan getLoadPlan(String filename) {
492+
return LoadPlan.builder().build();
499493
}
500-
501-
@Override
502-
public void append(Key key, Value value) throws IOException {}
503-
}, false);
494+
};
504495
}
505496
};
506497

0 commit comments

Comments
 (0)