Skip to content
This repository was archived by the owner on Nov 14, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions automation/compareWithSpark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/bin/sh
. ./configDefaults.sh


PERFORMANCE_DIR=$FILES_DIRECTORY"/performance"

if [[ ! -e $PERFORMANCE_DIR ]]; then
mkdir $PERFORMANCE_DIR;
fi

echo "system,word count,wc without combine, connected components,kmeans(low dimension),kmeans(high dimension),TPCH3,Page Rank" > $PERFORMANCE_DIR"/compExecutiontime.csv"

start=$(date +%s)
./runSpark-WC-Java.sh
end=$(date +%s)
secWC_spark=$(($end - $start))

start=$(date +%s)
./runSpark-WCgrouping-Java.sh
end=$(date +%s)
secWCWithoutCombine_spark=$(($end - $start))

start=$(date +%s)
./runSpark-CP-Java.sh
end=$(date +%s)
secCP_spark=$(($end - $start))

start=$(date +%s)
./runSpark-KMeansLowD-Java.sh
end=$(date +%s)
secKMeansLowDimension_spark=$(($end - $start))

start=$(date +%s)
./runSpark-KMeansHighD-Java.sh
end=$(date +%s)
secKMeansHighDimension_spark=$(($end - $start))

start=$(date +%s)
./runSpark-TPCH3-Java.sh
end=$(date +%s)
secTPCH3_spark=$(($end - $start))

start=$(date +%s)
./runSpark-PageRank-Java.sh
end=$(date +%s)
secPageRank_spark=$(($end - $start))

echo "spark,"$secWC_spark","$secWCWithoutCombine_spark","$secCP_spark","$secKMeansLowDimension_spark","$secKMeansHighDimension_spark","$secTPCH3_spark","$secPageRank_spark >> $PERFORMANCE_DIR"/compExecutiontime.csv"


start=$(date +%s)
./runWC-JAPI.sh
end=$(date +%s)
secWC=$(($end - $start))

start=$(date +%s)
./runWC-JAPI-withoutCombine.sh
end=$(date +%s)
secWCWithoutCombine=$(($end - $start))

start=$(date +%s)
./runCP-JAPI.sh
end=$(date +%s)
secCP=$(($end - $start))

start=$(date +%s)
./runKMeansLowDimension-JAPI.sh
end=$(date +%s)
secKMeansLowDimension=$(($end - $start))

start=$(date +%s)
./runKMeansHighDimension-JAPI.sh
end=$(date +%s)
secKMeansHighDimension=$(($end - $start))

start=$(date +%s)
./runTPCH3-JAPI.sh
end=$(date +%s)
secTPCH3=$(($end - $start))

start=$(date +%s)
./runPageRank-JAPI.sh
end=$(date +%s)
secPageRank=$(($end - $start))

echo "flink,"$secWC","$secWCWithoutCombine","$secCP","$secKMeansLowDimension","$secKMeansHighDimension","$secTPCH3","$secPageRank >> $PERFORMANCE_DIR"/compExecutiontime.csv"

python plotComparison.py $PERFORMANCE_DIR"/compExecutiontime.csv"
4 changes: 3 additions & 1 deletion automation/configDefaults.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ HDFS_CP=$HDFS_WORKING_DIRECTORY"/cp-in"
HDFS_CP_VERTEX=$HDFS_CP"/vertex.txt"
HDFS_CP_EDGE=$HDFS_CP"/edge.txt"
HDFS_CP_OUT=$HDFS_WORKING_DIRECTORY"/cp-out-"$RAND
HDFS_SPARK_CP_OUT=$HDFS_WORKING_DIRECTORY"/spark-cp-out-"$RAND

#KMeans
FILES_KMEANS_LOW_GEN_POINT=$FILES_DIRECTORY"/kmeans-data/point-low.txt"
Expand All @@ -81,6 +82,7 @@ FILES_PAGERANK_GEN_VERTEX=$FILES_DIRECTORY"/pagerank-data/vertex.txt"
FILES_PAGERANK_GEN_EDGE=$FILES_DIRECTORY"/pagerank-data/edge.txt"
HDFS_PAGERANK=$HDFS_WORKING_DIRECTORY"/pagerank-in"
HDFS_PAGERANK_OUT=$HDFS_WORKING_DIRECTORY"/pagerank-out-"$RAND
HDFS_SPARK_PAGERANK_OUT=$HDFS_WORKING_DIRECTORY"/spark-pagerank-out-"$RAND
HDFS_PAGERANK_PAGES=$HDFS_PAGERANK"/vertex.txt"
HDFS_PAGERANK_LINKS=$HDFS_PAGERANK"/edge.txt"

Expand All @@ -97,7 +99,7 @@ HDFS_TPCH10_OUT=$HDFS_WORKING_DIRECTORY"/tpch10-out-"$RAND
#TPCH3
HDFS_TPCH3=$HDFS_TESTJOB
HDFS_TPCH3_OUT=$HDFS_WORKING_DIRECTORY"/tpch3-out-"$RAND

HDFS_SPARK_TPCH3_OUT=$HDFS_WORKING_DIRECTORY"/spark-tpch3-out-"$RAND

# overwrite defaults by custom config
if [[ `basename $BASH_SOURCE` == "configDefaults.sh" ]] ; then
Expand Down
2 changes: 1 addition & 1 deletion automation/generateDataAll.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ echo "Generating Connected Components data."
./generateCPdata.sh

echo "Generating Page Rank data."
./generatePRdata.sh
./generatePRdata.sh 1000000 1000000

echo "Generating TPCH data."
./generateTestjobData.sh
55 changes: 55 additions & 0 deletions automation/plotComparison.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import numpy as np
import matplotlib.pyplot as plt
import sys


filename = sys.argv[1]
data = np.loadtxt(filename,delimiter=',',skiprows=1,usecols=(1,2,3,4,5,6,7)).T

fig, ax = plt.subplots(2,3)

ax[0][0].set_title("Word Count")
ax[0][0].set_xlim(-1, 3.5)
ax[0][0].set_xticks([0.5, 2.5])
ax[0][0].set_xticklabels(["spark", "flink"])
with_combine = ax[0][0].bar(left=(0,2), height=(data[0][0], data[0][1]), width=0.5, color="orange", yerr=0.01)
without_combine = ax[0][0].bar(left=(0.5,2.5), height=(data[1][0], data[1][1]), width=0.5, color="blue", yerr=0.01)
ax[0][0].legend([with_combine, without_combine], ["w/ combine", "w/o combine"],loc='best',fancybox=True,fontsize=10)

ax[0][1].set_title("Connected Components")
ax[0][1].set_xlim(-1, 2)
ax[0][1].set_xticks([0, 1])
ax[0][1].set_xticklabels(["spark", "flink"])
ax[0][1].bar(left=(0,1), height=(data[2][0],data[2][1]), align="center", width=0.5, color=["orange","blue"],yerr=0.01)

ax[0][2].set_title("TPCH 3")
ax[0][2].set_xlim(-1, 2)
ax[0][2].set_xticks([0, 1])
ax[0][2].set_xticklabels(["spark", "flink"])
ax[0][2].bar(left=(0,1), height=(data[5][0],data[5][1]), align="center", width=0.5, color=["orange","blue"],yerr=0.01)

ax[1][0].set_title("KMeans(D=3, K=8)")
ax[1][0].set_xlim(-1, 2)
ax[1][0].set_xticks([0, 1])
ax[1][0].set_xticklabels(["spark", "flink"])
ax[1][0].bar(left=(0,1), height=(data[3][0],data[3][1]), align="center", width=0.5, color=["orange","blue"],yerr=0.01)

ax[1][1].set_title("KMeans(D=1000, K=800)")
ax[1][1].set_xlim(-1, 2)
ax[1][1].set_xticks([0, 1])
ax[1][1].set_xticklabels(["spark", "flink"])
ax[1][1].bar(left=(0,1), height=(data[4][0],data[4][1]), align="center", width=0.5, color=["orange","blue"],yerr=0.01)

ax[1][2].set_title("Page Rank")
ax[1][2].set_xlim(-1, 2)
ax[1][2].set_xticks([0, 1])
ax[1][2].set_xticklabels(["spark", "flink"])
ax[1][2].bar(left=(0,1), height=(data[6][0],data[6][1]), align="center", width=0.5, color=["orange","blue"],yerr=0.01)

ax[0][0].set_ylabel("Runtime(s)")
ax[1][0].set_ylabel("Runtime(s)")
for i in range(2):
for j in range(3):
ax[i][j].grid(True)
plt.show()

5 changes: 3 additions & 2 deletions automation/prepareSpark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ echo "Going into Spark dir, fetching and checking out."
cd spark
git remote set-url origin $SPARK_GIT_REPO
git fetch origin
git checkout origin/$SPARK_GIT_BRANCH
git checkout $SPARK_GIT_BRANCH


echo "building spark"
#$MVN_BIN clean install -DskipTests -Dmaven.javadoc.skip=true $CUSTOM_FLINK_MVN
eval "sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly"
#eval "sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly"
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
cd $FILES_DIRECTORY


Expand Down
2 changes: 1 addition & 1 deletion automation/runKMeansHighDimension-JAPI.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ echo "Running KMeans High Dimension"
ARGS="$HDFS_KMEANS/point-high.txt $HDFS_KMEANS/center-high.txt $HDFS_KMEANS_OUT/high 100"
echo "running KMeans with args $ARGS"

$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/target/flink-perf-*.jar" \
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/flink-jobs/target/flink-jobs-*-SNAPSHOT.jar" \
-c com.github.projectflink.testPlan.KMeansArbitraryDimension $ARGS

2 changes: 1 addition & 1 deletion automation/runKMeansLowDimension-JAPI.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ echo "Running KMeans Low Dimension"
ARGS="$HDFS_KMEANS/point-low.txt $HDFS_KMEANS/center-low.txt $HDFS_KMEANS_OUT/low 100"
echo "running KMeans with args $ARGS"

$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/target/flink-perf-*.jar" \
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/flink-jobs/target/flink-jobs-*-SNAPSHOT.jar" \
-c com.github.projectflink.testPlan.KMeansArbitraryDimension $ARGS

5 changes: 3 additions & 2 deletions automation/runPageRank-JAPI.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ echo "Running Page Rank"

. ./configDefaults.sh

ARGS="$HDFS_PAGERANK_PAGES $HDFS_PAGERANK_LINKS $HDFS_PAGERANK_OUT 100000 10"
ARGS="$HDFS_PAGERANK_PAGES $HDFS_PAGERANK_LINKS $HDFS_PAGERANK_OUT 1000000 10"
echo "running Page Rank with args $ARGS"

$FLINK_BUILD_HOME"/bin/flink" run -p $DOP -j $FLINK_BUILD_HOME/examples/flink-java-examples-*-PageRankBasic.jar $ARGS
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/flink-jobs/target/flink-jobs-*-SNAPSHOT.jar" \
-c com.github.projectflink.testPlan.PageRank $ARGS

14 changes: 14 additions & 0 deletions automation/runSpark-CP-java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

echo "Running Spark Connected Components"

. ./configDefaults.sh

HOST=`hostname`
ARGS="spark://$HOST:7077 $HDFS_CP/vertex.txt $HDFS_CP/edge.txt $HDFS_SPARK_CP_OUT 1000"
echo "running connected components with args $ARGS"

$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.ConnectedComponents \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
$ARGS
2 changes: 1 addition & 1 deletion automation/runSpark-KMeansHighD-java.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ echo "running KMeans with args $ARGS"

$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.KMeansArbitraryDimension \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*.jar` \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
$ARGS

2 changes: 1 addition & 1 deletion automation/runSpark-KMeansLowD-java.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ echo "running KMeans with args $ARGS"

$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.KMeansArbitraryDimension \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*.jar` \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
$ARGS
14 changes: 14 additions & 0 deletions automation/runSpark-PageRank-java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

echo "Running Spark Page Rank"

. ./configDefaults.sh

HOST=`hostname`
ARGS="spark://$HOST:7077 $HDFS_PAGERANK_PAGES $HDFS_PAGERANK_LINKS $HDFS_SPARK_PAGERANK_OUT 1000000 10"
echo "running page rank with args $ARGS"

$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.PageRank \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
$ARGS
14 changes: 14 additions & 0 deletions automation/runSpark-TPCH3-java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

echo "Running Spark TPCH Query 3"

. ./configDefaults.sh

HOST=`hostname`
ARGS="spark://$HOST:7077 $HDFS_TPCH3/lineitem.tbl $HDFS_TPCH3/customer.tbl $HDFS_TPCH3/orders.tbl $HDFS_SPARK_TPCH3_OUT"
echo "running TPCH Query 3 with args $ARGS"

$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.TPCHQuery3 \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
$ARGS
4 changes: 2 additions & 2 deletions automation/runSpark-WC-Java.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ echo "Running Spark wordcount example"

HOST=`hostname`
$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--deploy-mode cluster --class com.github.projectflink.spark.WordCount \
`ls "$TESTJOB_HOME"/target/flink-perf-*-SparkWC.jar` \
--class com.github.projectflink.spark.WordCount \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SparkWC.jar` \
spark://$HOST:7077 $HDFS_WC $HDFS_SPARK_WC_OUT


13 changes: 13 additions & 0 deletions automation/runSpark-WCgrouping-Java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

echo "Running Spark wordcount grouping example"

. ./configDefaults.sh

HOST=`hostname`
$SPARK_HOME/bin/spark-submit --master spark://$HOST:7077 \
--class com.github.projectflink.spark.WordCountGrouping \
`ls "$TESTJOB_HOME"/spark-jobs/target/spark-jobs-*-SNAPSHOT.jar` \
spark://$HOST:7077 $HDFS_WC $HDFS_SPARK_WC_OUT 1


2 changes: 1 addition & 1 deletion automation/runTPCH3-JAPI.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ echo "Running TPCH-3"

ARGS="$HDFS_TPCH3/lineitem.tbl $HDFS_TPCH3/customer.tbl $HDFS_TPCH3/orders.tbl $HDFS_TPCH3_OUT"
echo "running TPCH-3 with args $ARGS"
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/target/flink-perf-*.jar" \
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP $TESTJOB_HOME"/flink-jobs/target/flink-jobs-*-SNAPSHOT.jar" \
-c com.github.projectflink.testPlan.TPCHQuery3 $ARGS

5 changes: 2 additions & 3 deletions automation/runWC-JAPI-withoutCombine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ echo "Running wordcount japi without combine example"

. ./configDefaults.sh

ARGS="$HDFS_WC $HDFS_WC_OUT"
echo "running wc with args $ARGS"
$FLINK_BUILD_HOME"/bin/flink" run -p $DOP -c com.github.projectflink.testPlan.WordCountWithoutCombine -j $TESTJOB_HOME"/target/flink-perf-*.jar" $ARGS
echo "running wc with args"
$FLINK_BUILD_HOME"/bin/flink" run -c com.github.projectflink.testPlan.WordCountWithoutCombine -p $DOP $TESTJOB_HOME"/flink-jobs/target/flink-jobs-*-SNAPSHOT.jar" $HDFS_WC $HDFS_WC_OUT
24 changes: 1 addition & 23 deletions flink-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

<modelVersion>4.0.0</modelVersion>

<groupId>com.github.project-flink</groupId>
<version>0.1-SNAPSHOT</version>
<artifactId>flink-jobs</artifactId>
<name>flink-jobs</name>

Expand All @@ -32,7 +30,7 @@
</repositories>

<properties>
<flink.version>0.6-incubating-SNAPSHOT</flink.version>
<flink.version>0.7-incubating-SNAPSHOT</flink.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -102,26 +100,6 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>Spark_WC</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>SparkWC</classifier>

<includes>
<include>**/spark/WordCount*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple12;
Expand Down
Loading