Skip to content

Commit 32bdbc5

Browse files
keith-ratcliffealerman
authored andcommitted
Bulk V2
WIP adapt DW PR#2568 to use accumulo PR#4898 These draft changes build on #2568 with the following differences. * Compute bulkv2 load plans using new unreleased APIs in accumulo PR 4898 * The table splits are loaded at the beginning of writing to rfiles instead of at the end. Not sure about the overall implications on on memory use in reducers of this change. The load plan could be computed after the rfile is closed using a new API in 4898 if defering the loading of tablet splits is desired. * Switches to using accumulo public APIs for writing rfiles instaead of internal accumulo methods. Well public once they are actually released. * The algorithm to compute the load plan does less work per key/value. Should be rougly constant time vs log(N). * Adds a simple SortedList class. This reason this was added is that this code does binary searches on list, however it was not certain those list were actually sorted. If the list was not sorted it would not cause exceptions in binary search but could lead to incorrect load plans and lost data. This new SortedList class ensures list are sorted and allows this assurance to travel around in the code. Maybe this change should be its own PR.
1 parent be9e20d commit 32bdbc5

File tree

20 files changed

+827
-129
lines changed

20 files changed

+827
-129
lines changed

contrib/datawave-quickstart/bin/services/datawave/bootstrap-ingest.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ DW_DATAWAVE_INGEST_HOME="${DW_CLOUD_HOME}/${DW_DATAWAVE_INGEST_SYMLINK}"
99
# ingest reducers. Set to 1 for standalone instance, but typically set to the first prime number that is less than the
1010
# number of available Accumulo tablet servers...
1111

12-
DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-1}
12+
DW_DATAWAVE_INGEST_NUM_SHARDS=${DW_DATAWAVE_INGEST_NUM_SHARDS:-10}
1313

1414
# Ingest job logs, etc
1515

@@ -39,7 +39,7 @@ DW_DATAWAVE_INGEST_FLAGFILE_DIR="${DW_DATAWAVE_DATA_DIR}/flags"
3939

4040
# Comma-delimited list of configs for the FlagMaker process(es)
4141

42-
DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml"}
42+
DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS=${DW_DATAWAVE_INGEST_FLAGMAKER_CONFIGS:-"${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-live.xml,${DW_DATAWAVE_INGEST_CONFIG_HOME}/flag-maker-bulk.xml"}
4343

4444
# Dir for ingest-related 'pid' files
4545

@@ -76,7 +76,7 @@ DW_DATAWAVE_INGEST_LIVE_DATA_TYPES=${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES:-"wikipe
7676

7777
# Comma-delimited data type identifiers to be ingested via "bulk" ingest, ie via bulk import of RFiles into Accumulo tables
7878

79-
DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats"}
79+
DW_DATAWAVE_INGEST_BULK_DATA_TYPES=${DW_DATAWAVE_INGEST_BULK_DATA_TYPES:-"shardStats,wikipedia,mycsv,myjson"}
8080

8181
DW_DATAWAVE_MAPRED_INGEST_OPTS=${DW_DATAWAVE_MAPRED_INGEST_OPTS:-"-useInlineCombiner -ingestMetricsDisabled"}
8282

contrib/datawave-quickstart/bin/services/datawave/install-ingest.sh

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ tar xf "${DW_DATAWAVE_SERVICE_DIR}/${DW_DATAWAVE_INGEST_DIST}" -C "${TARBALL_BAS
3838
datawaveIngestIsInstalled || ( fatal "DataWave Ingest was not installed" && exit 1 )
3939
info "DataWave Ingest tarball extracted and symlinked"
4040

41+
source "${THIS_DIR}/fix-hadoop-classpath.sh"
42+
4143
if ! hadoopIsRunning ; then
4244
info "Starting Hadoop, so that we can initialize Accumulo"
4345
hadoopStart
4446
fi
4547

46-
# Create any Hadoop directories related to Datawave Ingest
48+
# Create any Hadoop directories needed for live ingest input
4749
if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
4850

4951
OLD_IFS="${IFS}"
@@ -56,6 +58,20 @@ if [[ -n "${DW_DATAWAVE_INGEST_LIVE_DATA_TYPES}" ]] ; then
5658
done
5759
fi
5860

61+
# Create any Hadoop directories needed for bulk ingest input
62+
if [[ -n "${DW_DATAWAVE_INGEST_BULK_DATA_TYPES}" ]] ; then
63+
64+
OLD_IFS="${IFS}"
65+
IFS=","
66+
HDFS_RAW_INPUT_DIRS=( ${DW_DATAWAVE_INGEST_BULK_DATA_TYPES} )
67+
IFS="${OLD_IFS}"
68+
69+
for dir in "${HDFS_RAW_INPUT_DIRS[@]}" ; do
70+
# Dirs created here should be configured in your bulk flag maker config (e.g., in config/flag-maker-bulk.xml)
71+
hdfs dfs -mkdir -p "${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/${dir}-bulk" || fatal "Failed to create HDFS directory: ${dir}-bulk"
72+
done
73+
fi
74+
5975
#----------------------------------------------------------
6076
# Configure/update Accumulo classpath, set auths, etc
6177
#----------------------------------------------------------

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@
4848
<sonar.exclusions>**/StandardLexer.java,**/*.js,**/*.css,**/*.html</sonar.exclusions>
4949
<sonar.sourceEncoding>UTF-8</sonar.sourceEncoding>
5050
<surefire.forkCount>1C</surefire.forkCount>
51-
<!-- Tests that use MiniAccumuloCluster hang when trying to bump version.accumulo to 2.1.3 -->
52-
<version.accumulo>2.1.2</version.accumulo>
51+
<version.accumulo>2.1.4-97e4684860</version.accumulo>
5352
<version.arquillian>1.4.1.Final</version.arquillian>
5453
<version.arquillian-weld-ee-embedded>1.0.0.Final</version.arquillian-weld-ee-embedded>
5554
<version.assertj>3.20.2</version.assertj>

properties/dev.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ LIVE_CHILD_MAP_MAX_MEMORY_MB=1024
4141
BULK_CHILD_REDUCE_MAX_MEMORY_MB=2048
4242
LIVE_CHILD_REDUCE_MAX_MEMORY_MB=1024
4343

44-
BULK_INGEST_DATA_TYPES=shardStats
44+
BULK_INGEST_DATA_TYPES=shardStats,wikipedia,mycsv,myjson
4545
LIVE_INGEST_DATA_TYPES=wikipedia,mycsv,myjson
4646

4747
# Clear out these values if you do not want standard shard ingest.

warehouse/accumulo-extensions/src/test/java/datawave/ingest/table/balancer/ShardedTableTabletBalancerTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
3434
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
3535
import org.apache.accumulo.core.metadata.TServerInstance;
36+
import org.apache.accumulo.core.metadata.schema.Ample;
3637
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
3738
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
3839
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
@@ -257,7 +258,7 @@ public void testNoRebalanceWithPendingMigrationsForServer() {
257258
new TabletIdImpl(new KeyExtent(bar, new Text("2"), new Text("1"))),
258259
new TabletIdImpl(new KeyExtent(TNAME, new Text("2"), new Text("1"))));
259260
//@formatter:on
260-
long balanceWaitTime = testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), migrations, migrationsOut));
261+
long balanceWaitTime = testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), migrations, migrationsOut, Ample.DataLevel.USER));
261262
assertEquals("Incorrect balance wait time reported", 5000, balanceWaitTime);
262263
assertTrue("Generated migrations when we had pending migrations for our table! [" + migrationsOut + "]", migrationsOut.isEmpty());
263264

@@ -267,7 +268,7 @@ public void testNoRebalanceWithPendingMigrationsForServer() {
267268
new TabletIdImpl(new KeyExtent(foo, new Text("2"), new Text("1"))),
268269
new TabletIdImpl(new KeyExtent(bar, new Text("2"), new Text("1"))));
269270
//@formatter:on
270-
balanceWaitTime = testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), migrations, migrationsOut));
271+
balanceWaitTime = testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), migrations, migrationsOut, Ample.DataLevel.USER));
271272
assertEquals("Incorrect balance wait time reported", 5000, balanceWaitTime);
272273
ensureUniqueMigrations(migrationsOut);
273274
testTServers.applyMigrations(migrationsOut);
@@ -547,7 +548,7 @@ private void runAndCheckBalance(int numPasses) {
547548
ArrayList<TabletMigration> migrationsOut = new ArrayList<>();
548549
for (int i = 1; i <= numPasses; i++) {
549550
migrationsOut.clear();
550-
testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), new HashSet<>(), migrationsOut));
551+
testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), new HashSet<>(), migrationsOut, Ample.DataLevel.USER));
551552
ensureUniqueMigrations(migrationsOut);
552553
testTServers.applyMigrations(migrationsOut);
553554

@@ -556,7 +557,7 @@ private void runAndCheckBalance(int numPasses) {
556557
}
557558
// Then balance one more time to make sure no migrations are returned.
558559
migrationsOut.clear();
559-
testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), new HashSet<>(), migrationsOut));
560+
testBalancer.balance(new BalanceParamsImpl(testTServers.getCurrent(), new HashSet<>(), migrationsOut, Ample.DataLevel.USER));
560561
assertEquals("Left with " + migrationsOut.size() + " migrations after " + numPasses + " balance attempts.", 0, migrationsOut.size());
561562
testTServers.checkBalance(testBalancer.getPartitioner());
562563
}

warehouse/ingest-configuration/src/main/resources/config/ingest-config.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,24 @@
9191
<name>partitioner.default.delegate</name>
9292
<value>datawave.ingest.mapreduce.partition.MultiTableRRRangePartitioner</value>
9393
</property>
94+
95+
<property>
96+
<name>datawave.ingest.splits.cache.dir</name>
97+
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/splitsCache</value>
98+
</property>
99+
100+
<property>
101+
<name>accumulo.config.cache.path</name>
102+
<value>${WAREHOUSE_HDFS_NAME_NODE}/data/accumuloConfigCache/accConfCache.txt</value>
103+
</property>
104+
105+
<property>
106+
<name>ingest.bulk.import.mode</name>
107+
<value>V2_LOAD_PLANNING</value>
108+
<description>
109+
Must be one of [V1, V2_LOCAL_MAPPING, V2_LOAD_PLANNING]
110+
(See BulkIngestMapFileLoader.ImportMode)
111+
</description>
112+
</property>
113+
94114
</configuration>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
3+
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
4+
5+
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
6+
<param name="Target" value="System.out"/>
7+
<layout class="org.apache.log4j.PatternLayout">
8+
<param name="ConversionPattern" value="%d{ISO8601} %p [%c{1.}] [%t-%tid] %m%n"/>
9+
</layout>
10+
</appender>
11+
12+
<logger name="org.apache.hadoop">
13+
<level value="info"/>
14+
</logger>
15+
16+
<root>
17+
<priority value="debug"/>
18+
<appender-ref ref="CONSOLE"/>
19+
</root>
20+
</log4j:configuration>

0 commit comments

Comments
 (0)