Skip to content

Commit 0eddce0

Browse files
committed
Merge pull request #1338 from MansurAshraf/mashraf/thrift_macro
Ordered Serialization macros for thrift
2 parents eda6ea6 + d941aee commit 0eddce0

File tree

19 files changed

+1418
-2
lines changed

19 files changed

+1418
-2
lines changed

.travis.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ matrix:
5454
env: BUILD="base" TEST_TARGET="scalding-serialization"
5555
script: "scripts/run_test.sh"
5656

57+
- scala: 2.10.5
58+
env: BUILD="base" TEST_TARGET="scalding-thrift-macros"
59+
script: "scripts/run_test.sh"
60+
61+
- scala: 2.11.7
62+
env: BUILD="base" TEST_TARGET="scalding-thrift-macros"
63+
script: "scripts/run_test.sh"
64+
5765
- scala: 2.10.5
5866
env: BUILD="test tutorials and matrix tutorials and repl" TEST_TARGET="scalding-repl"
5967
script:

project/Build.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
88
import com.typesafe.tools.mima.plugin.MimaKeys._
99
import scalariform.formatter.preferences._
1010
import com.typesafe.sbt.SbtScalariform._
11+
import com.twitter.scrooge.ScroogeSBT
1112

1213
import scala.collection.JavaConverters._
1314

@@ -217,7 +218,8 @@ object ScaldingBuild extends Build {
217218
scaldingDb,
218219
maple,
219220
executionTutorial,
220-
scaldingSerialization
221+
scaldingSerialization,
222+
scaldingThriftMacros
221223
)
222224

223225
lazy val scaldingAssembly = Project(
@@ -527,4 +529,34 @@ object ScaldingBuild extends Build {
527529
addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full)
528530
).dependsOn(scaldingCore)
529531

532+
lazy val scaldingThriftMacros = module("thrift-macros")
533+
.settings(ScroogeSBT.newSettings:_*)
534+
.settings(
535+
ScroogeSBT.scroogeThriftSourceFolder in Compile <<= baseDirectory {
536+
base => base / "src/test/resources"
537+
},
538+
compile in Compile <<= (compile in Compile) dependsOn (ScroogeSBT.scroogeGen in Compile),
539+
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
540+
"org.scala-lang" % "scala-library" % scalaVersion,
541+
"org.scala-lang" % "scala-reflect" % scalaVersion,
542+
"com.twitter" %% "bijection-macros" % bijectionVersion,
543+
"com.twitter" % "chill-thrift" % chillVersion % "test",
544+
"com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided",
545+
"org.apache.thrift" % "libthrift" % thriftVersion,
546+
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test",
547+
"org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion % "test",
548+
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test",
549+
"org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion % "test",
550+
"org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests",
551+
"org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion % "test",
552+
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests",
553+
"org.apache.hadoop" % "hadoop-common" % hadoopVersion classifier "tests",
554+
"org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests"
555+
) ++ (if (isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % "2.0.1") else Seq())
556+
},
557+
addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full)
558+
).dependsOn(
559+
scaldingCore,
560+
scaldingHadoopTest % "test",
561+
scaldingSerialization)
530562
}

project/plugins.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.
22

33
resolvers ++= Seq(
44
"jgit-repo" at "http://download.eclipse.org/jgit/maven",
5-
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases"
5+
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases",
6+
"Twitter Maven" at "http://maven.twttr.com"
67
)
78

89
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
@@ -14,3 +15,5 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
1415
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")
1516

1617
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
18+
19+
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "3.16.3")
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.twitter.scalding.serialization
2+
3+
import com.twitter.scalding._
4+
5+
import scala.language.experimental.{ macros => smacros }
6+
7+
/**
8+
* RequiredBinaryComparators provide comparators (or Ordering in Scala) that are capable of comparing keys in their
9+
* serialized form reducing the amount of time spent in serialization/deserialization. These comparators are implemented
10+
* using Scala macros, and currently provide binary comparators for primitives, strings, Options, tuples, collections, case classes
11+
* and Scrooge objects.
12+
*/
13+
trait RequiredBinaryComparators extends RequiredBinaryComparatorsConfig {
14+
15+
implicit def ordSer[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T]
16+
17+
}
18+
19+
object RequiredBinaryComparators {
20+
21+
implicit def orderedSerialization[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T]
22+
}
23+
24+
/**
25+
* Use this for an ExecutionApp.
26+
*/
27+
trait RequiredBinaryComparatorsExecutionApp[K] extends ExecutionApp {
28+
implicit def ordSer[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T]
29+
30+
override def config(inputArgs: Array[String]): (Config, Mode) = {
31+
val (conf, m) = super.config(inputArgs)
32+
(conf.setRequireOrderedSerialization(true), m)
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.twitter.scalding.serialization
2+
3+
import com.twitter.scalding.{ Config, Job }
4+
5+
trait RequiredBinaryComparatorsConfig extends Job {
6+
override def config = super.config + (Config.ScaldingRequireOrderedSerialization -> "true")
7+
}

scalding-thrift-macros/NOTICE

Whitespace-only changes.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
Copyright 2014 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.scalding.thrift.macros
17+
18+
import com.twitter.scalding.serialization.OrderedSerialization
19+
import com.twitter.scalding.thrift.macros.impl.ScroogeInternalOrderedSerializationImpl
20+
21+
import scala.language.experimental.{ macros => sMacros }
22+
23+
object Macros {
24+
implicit def scroogeOrdSer[T]: OrderedSerialization[T] = macro ScroogeInternalOrderedSerializationImpl[T]
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.twitter.scalding.thrift.macros
2+
3+
import com.twitter.scalding.serialization.{ OrderedSerialization, RequiredBinaryComparatorsConfig }
4+
import com.twitter.scalding.thrift.macros.impl.ScroogeInternalOrderedSerializationImpl
5+
import scala.language.experimental.{ macros => smacros }
6+
7+
/**
8+
* Provides support for Scrooge classes in addition to primitives, cases classes, tuples etc. Use this
9+
* if you use Scrooge classes as `key` in your scalding job.
10+
* @author Mansur Ashraf.
11+
*/
12+
trait RequiredBinaryComparators extends RequiredBinaryComparatorsConfig {
13+
implicit def ordSer[T]: OrderedSerialization[T] = macro ScroogeInternalOrderedSerializationImpl[T]
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
Copyright 2014 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.scalding.thrift.macros.impl
17+
18+
import com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl
19+
import com.twitter.scalding.serialization.macros.impl.ordered_serialization._
20+
import com.twitter.scalding.serialization.OrderedSerialization
21+
import com.twitter.scalding.thrift.macros.impl.ordered_serialization.{ ScroogeEnumOrderedBuf, ScroogeUnionOrderedBuf, ScroogeOrderedBuf, ScroogeOuterOrderedBuf }
22+
23+
import scala.language.experimental.macros
24+
import scala.reflect.macros.Context
25+
26+
object ScroogeInternalOrderedSerializationImpl {
27+
// The inner dispatcher
28+
// This one is able to handle all scrooge types along with all normal scala types too
29+
// One exception is that if it meets another thrift struct it will hit the ScroogeOuterOrderedBuf
30+
// which will inject an implicit lazy val for a new OrderedSerialization and then exit the macro.
31+
// This avoids methods becoming too long via inlining.
32+
private def innerDispatcher(c: Context): PartialFunction[c.Type, TreeOrderedBuf[c.type]] = {
33+
import c.universe._
34+
def buildDispatcher: PartialFunction[c.Type, TreeOrderedBuf[c.type]] = ScroogeInternalOrderedSerializationImpl.innerDispatcher(c)
35+
val scroogeEnumDispatcher = ScroogeEnumOrderedBuf.dispatch(c)
36+
val scroogeUnionDispatcher = ScroogeUnionOrderedBuf.dispatch(c)(buildDispatcher)
37+
val scroogeOuterOrderedBuf = ScroogeOuterOrderedBuf.dispatch(c)
38+
39+
OrderedSerializationProviderImpl.normalizedDispatcher(c)(buildDispatcher)
40+
.orElse(scroogeEnumDispatcher)
41+
.orElse(scroogeUnionDispatcher)
42+
.orElse(scroogeOuterOrderedBuf)
43+
.orElse(OrderedSerializationProviderImpl.scaldingBasicDispatchers(c)(buildDispatcher))
44+
.orElse(OrderedSerializationProviderImpl.fallbackImplicitDispatcher(c))
45+
.orElse {
46+
case tpe: Type => c.abort(c.enclosingPosition, s"""Unable to find OrderedSerialization for type ${tpe}""")
47+
}
48+
}
49+
50+
// The outer dispatcher
51+
// This is the dispatcher routine only hit when we enter in via an external call implicitly or explicitly to the macro.
52+
// It has the ability to generate code for thrift structs, with the scroogeDispatcher.
53+
private def outerDispatcher(c: Context): PartialFunction[c.Type, TreeOrderedBuf[c.type]] = {
54+
def buildOuterDispatcher: PartialFunction[c.Type, TreeOrderedBuf[c.type]] = ScroogeInternalOrderedSerializationImpl.outerDispatcher(c)
55+
def buildDispatcher: PartialFunction[c.Type, TreeOrderedBuf[c.type]] = ScroogeInternalOrderedSerializationImpl.innerDispatcher(c)
56+
57+
val innerDisp = innerDispatcher(c)
58+
59+
val scroogeDispatcher = ScroogeOrderedBuf.dispatch(c)(buildDispatcher)
60+
61+
OrderedSerializationProviderImpl.normalizedDispatcher(c)(buildOuterDispatcher)
62+
.orElse(scroogeDispatcher)
63+
.orElse(innerDisp)
64+
}
65+
66+
def apply[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[OrderedSerialization[T]] = {
67+
val b: TreeOrderedBuf[c.type] = outerDispatcher(c)(T.tpe)
68+
TreeOrderedBuf.toOrderedSerialization[T](c)(b)
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2014 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.scalding.thrift.macros.impl.ordered_serialization
17+
18+
import com.twitter.scalding.serialization.macros.impl.ordered_serialization._
19+
import com.twitter.scrooge.ThriftEnum
20+
21+
import scala.language.experimental.macros
22+
import scala.reflect.macros.Context
23+
24+
object ScroogeEnumOrderedBuf {
25+
def dispatch(c: Context): PartialFunction[c.Type, TreeOrderedBuf[c.type]] = {
26+
import c.universe._
27+
28+
val pf: PartialFunction[c.Type, TreeOrderedBuf[c.type]] = {
29+
case tpe if tpe <:< typeOf[ThriftEnum] => ScroogeEnumOrderedBuf(c)(tpe)
30+
}
31+
pf
32+
}
33+
34+
def apply(c: Context)(outerType: c.Type): TreeOrderedBuf[c.type] = {
35+
import c.universe._
36+
37+
def freshT(id: String) = newTermName(c.fresh(s"fresh_$id"))
38+
39+
new TreeOrderedBuf[c.type] {
40+
override val ctx: c.type = c
41+
override val tpe = outerType
42+
override def compareBinary(inputStreamA: ctx.TermName, inputStreamB: ctx.TermName) =
43+
q"""
44+
_root_.java.lang.Integer.compare($inputStreamA.readPosVarInt, $inputStreamB.readPosVarInt)
45+
"""
46+
47+
override def hash(element: ctx.TermName): ctx.Tree =
48+
q"_root_.com.twitter.scalding.serialization.Hasher.int.hash($element.value)"
49+
50+
override def put(inputStream: ctx.TermName, element: ctx.TermName) =
51+
q"$inputStream.writePosVarInt($element.value)"
52+
override def get(inputStream: ctx.TermName): ctx.Tree =
53+
q"${outerType.typeSymbol.companionSymbol}.apply($inputStream.readPosVarInt)"
54+
override def compare(elementA: ctx.TermName, elementB: ctx.TermName): ctx.Tree =
55+
q"""
56+
_root_.java.lang.Integer.compare($elementA.value, $elementB.value) : Int
57+
"""
58+
59+
override def length(element: Tree): CompileTimeLengthTypes[c.type] = CompileTimeLengthTypes.FastLengthCalculation(c)(q"posVarIntSize($element.value)")
60+
override val lazyOuterVariables: Map[String, ctx.Tree] = Map.empty
61+
}
62+
}
63+
}
64+

0 commit comments

Comments
 (0)