Skip to content

Beam backend: use TypedPipe descriptions as names for PTransforms #1983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ object OptimizationRules {
case MergedTypedPipe(a, EmptyTypedPipe) => a
case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe
case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe
case TrappedPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case TrappedPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe
case WithOnComplete(EmptyTypedPipe, _) =>
EmptyTypedPipe // there is nothing to do, so we never have workers complete
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.twitter.scalding.beam_backend

import com.twitter.scalding.dagon.{FunctionK, Memoize, Rule}
import com.twitter.chill.KryoInstantiator
import com.twitter.chill.config.ScalaMapConfig
import com.twitter.scalding.Config
import com.twitter.scalding.beam_backend.BeamOp.{CoGroupedOp, MergedBeamOp}
import com.twitter.scalding.dagon.{FunctionK, Memoize, Rule}
import com.twitter.scalding.serialization.KryoHadoop
import com.twitter.scalding.typed.OptimizationRules._
import com.twitter.scalding.typed._
import com.twitter.scalding.typed.cascading_backend.CascadingExtensions.ConfigCascadingExtensions
import com.twitter.scalding.typed.functions.{
FilterKeysToFilter,
FlatMapValuesToFlatMap,
MapValuesToMap,
ScaldingPriorityQueueMonoid
}

import com.twitter.scalding.typed.cascading_backend.CascadingExtensions.ConfigCascadingExtensions

object BeamPlanner {
def plan(
config: Config,
Expand Down Expand Up @@ -61,8 +61,15 @@ object BeamPlanner {
BeamOp.Source(config, src, srcs(src))
case (IterablePipe(iterable), _) =>
BeamOp.FromIterable(iterable, kryoCoder)
case (wd: WithDescriptionTypedPipe[a], rec) =>
rec[a](wd.input)
case (wd: WithDescriptionTypedPipe[_], rec) => {
val op = rec(wd.input)
wd.descriptions match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, as commented, I think this is probably running the risk of dropping some descriptions.

I would do:

op.withName(wd.descriptions.map(_._1).mkString(", "))

otherwise I think you will wind up with cases where you lose line numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll try this out.

case head :: _ =>
op.withName(head._1)
case Nil =>
op
}
}
case (SumByLocalKeys(pipe, sg), rec) =>
val op = rec(pipe)
config.getMapSideAggregationThreshold match {
Expand Down Expand Up @@ -97,7 +104,10 @@ object BeamPlanner {
uir.evidence.subst[BeamOpT](sortedOp)
}
go(ivsr)
case (ReduceStepPipe(ValueSortedReduce(keyOrdering, pipe, valueSort, reduceFn, _, _)), rec) =>
case (
ReduceStepPipe(ValueSortedReduce(keyOrdering, pipe, valueSort, reduceFn, _, _)),
rec
) =>
val op = rec(pipe)
op.sortedMapGroup(reduceFn)(keyOrdering, valueSort, kryoCoder)
case (ReduceStepPipe(IteratorMappedReduce(keyOrdering, pipe, reduceFn, _, _)), rec) =>
Expand All @@ -116,7 +126,7 @@ object BeamPlanner {
val ops: Seq[BeamOp[(K, Any)]] = cg.inputs.map(tp => rec(tp))
CoGroupedOp(cg, ops)
}
go(cg)
if (cg.descriptions.isEmpty) go(cg) else go(cg).withName(cg.descriptions.last)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using last here but head above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to ask you about this myself. If you look at the test case in this PR, The cogrouped expression has two descriptions "Count words" and "Join with t1", both of which appear in the descriptions of CoGrouped. But since this appears to be appending descriptions for eg. in CoGrouped.Pair and the optimization rule ComposeDescriptions, I chose to keep the last one. Hence no assertion for "Count words" even in that test case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not combine all of them? Why not just cg.descriptions.mkString("\n") or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me try this out on a job first. Thanks!

case (Fork(input), rec) =>
rec(input)
case (m @ MergedTypedPipe(_, _), rec) =>
Expand All @@ -137,7 +147,21 @@ object BeamPlanner {

def defaultOptimizationRules(config: Config): Seq[Rule[TypedPipe]] = {
def std(forceHash: Rule[TypedPipe]) =
OptimizationRules.standardMapReduceRules :::
List(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change here? Is this copying the same cascading optimizations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really is the standardMapReduceRules with DescribeLater excluded. I've mentioned in the summary of this PR that I chose to exclude that rule in order to capture the line numbers at the right points, without the optimizer changing the AST. I'm curious if there's a better way to do this though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed that. Note: the line numbers are already captured by the time that rule runs. The line numbers are collected on the TypedPipe. I don't see how removing that rule helps descriptions.

If I were you, I would look at all the descriptions and add the full list.

The problem with removing that rule is that it will block merging nodes together. It may be fine, maybe Beam will follow up with optimizations, but I would be careful: scalding may do some optimizations that beam doesn't.

Copy link
Contributor Author

@navinvishy navinvishy Apr 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line numbers are in the descriptions though, if users don't explicitly add them with a call to withDescription.
Here's what I'm trying to do: if we have a TypedPipe that looks something like this:
WithDescription(Mapped(WithDescription(FlatMapped(), "a")), "b")
I'm trying to associate the description "a" with the PTransform corresponding to the FlatMapped and the description "b" with the PTransform corresponding to the Mapped. By default, the "a" and "b" here are line numbers unless the user explicitly added their own descriptions.
If I let DescribeLater run, I think we'll end up with something like this:
WithDescription(FlatMapped(), ["a","b"])
(since this would also compose the Mapped and FlatMapped, and then also run ComposeDescriptions)
This would be hard to debug, when there is an error in this stage in the job since it's hard to map back to the code.
But as I write this, I'm thinking if its probably better to just concatenate the line numbers instead...
Dataflow (which is the Beam runner we're using) does apply similar optimizations (I've mentioned this in the summary), but I'm debating now if its a good idea to change what the optimizer does, since it definitely makes the optimizer less useful. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know the answer. So, line numbers are added to descriptions when the user constructs the TypedPipe, see:

private[scalding] def withLine: TypedPipe[T] =

The idea of DescribeLater is to try to merge as many operations into one step and then put all the line numbers/descriptions on that single step. By taking just a single item you are hiding which actions might be combined.

As to what to do, I don't know. If beam's optimizer is very good, it maybe doesn't matter. Maybe you should try to separate runs with a somewhat complex job and compare?

Also, I can imagine a Config setting like scalding.preserve_description_order that removes applying that optimization. Then if you were debugging, you could set that to true, but for a production job you could set it to false.

I would probably bias to just combining all the descriptions into a single beam description unless you actually see problems. You can always come back and add that setting. I would personally bias to maximizing the utility of the optimizer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me try out some jobs with the rule enabled and showing all the descriptions.

// phase 0, add explicit forks to not duplicate pipes on fanout below
AddExplicitForks,
RemoveUselessFork,
// phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc...
IgnoreNoOpGroup.orElse(composeSame).orElse(FilterKeysEarly).orElse(DeferMerge),
// phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges
composeIntoFlatMap
.orElse(simplifyEmpty)
.orElse(DiamondToFlatMap)
.orElse(ComposeDescriptions)
.orElse(MapValuesInReducers),
// phase 3, remove duplicates forces/forks (e.g. .fork.fork or .forceToDisk.fork, ....)
RemoveDuplicateForceFork
) :::
List(
OptimizationRules.FilterLocally, // after filtering, we may have filtered to nothing, lets see
OptimizationRules.simplifyEmpty,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package com.twitter.scalding.beam_backend

import com.twitter.scalding.dagon.Memoize
import com.twitter.algebird.Semigroup
import com.twitter.scalding.Config
import com.twitter.scalding.beam_backend.BeamFunctions._
import com.twitter.scalding.beam_backend.BeamJoiner.MultiJoinFunction
import com.twitter.scalding.dagon.Memoize
import com.twitter.scalding.serialization.Externalizer
import com.twitter.scalding.typed.{CoGrouped, Input}
import com.twitter.scalding.typed.functions.ComposedFunctions.ComposedMapGroup
import com.twitter.scalding.typed.functions.{EmptyGuard, MapValueStream, ScaldingPriorityQueueMonoid, SumAll}
import com.twitter.scalding.typed.{CoGrouped, Input}
import java.util.{Comparator, PriorityQueue}
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.{Coder, IterableCoder, KvCoder}
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.Top.TopCombineFn
import org.apache.beam.sdk.transforms._
import org.apache.beam.sdk.transforms.join.{CoGbkResult, CoGroupByKey, KeyedPCollectionTuple}
import org.apache.beam.sdk.values.PCollectionList
import org.apache.beam.sdk.values.PCollectionTuple
import org.apache.beam.sdk.values.{KV, PCollection, TupleTag}
import org.apache.beam.sdk.values.{KV, PCollection, PCollectionList, PCollectionTuple, TupleTag}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -55,6 +53,8 @@ sealed abstract class BeamOp[+A] {

def flatMap[B](f: A => TraversableOnce[B])(implicit kryoCoder: KryoCoder): BeamOp[B] =
parDo(FlatMapFn(f), "flatMap")

def withName(name: String): BeamOp[A]
}

private final case class SerializableComparator[T](comp: Comparator[T]) extends Comparator[T] {
Expand Down Expand Up @@ -136,11 +136,16 @@ object BeamOp extends Serializable {
)
case Some(src) => src.read(pipeline, conf)
}

override def withName(name: String): BeamOp[A] = this
}

final case class FromIterable[A](iterable: Iterable[A], kryoCoder: KryoCoder) extends BeamOp[A] {
final case class FromIterable[A](iterable: Iterable[A], kryoCoder: KryoCoder, name: Option[String] = None)
extends BeamOp[A] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: A] =
pipeline.apply(Create.of(iterable.asJava).withCoder(kryoCoder))
pipeline.apply(name.getOrElse("Iterable source"), Create.of(iterable.asJava).withCoder(kryoCoder))

override def withName(name: String): BeamOp[A] = FromIterable(iterable, kryoCoder, Some(name))
}

final case class TransformBeamOp[A, B](
Expand All @@ -153,6 +158,8 @@ object BeamOp extends Serializable {
val pCollection: PCollection[A] = widenPCollection(source.run(pipeline))
pCollection.apply(name, f).setCoder(kryoCoder)
}

override def withName(desc: String): BeamOp[B] = TransformBeamOp(source, f, kryoCoder, desc)
}

final case class HashJoinTransform[K, V, U, W](
Expand Down Expand Up @@ -184,7 +191,8 @@ object BeamOp extends Serializable {
final case class HashJoinOp[K, V, U, W](
left: BeamOp[(K, V)],
right: BeamOp[(K, U)],
joiner: (K, V, Iterable[U]) => Iterator[W]
joiner: (K, V, Iterable[U]) => Iterator[W],
name: Option[String] = None
)(implicit kryoCoder: KryoCoder, ordK: Ordering[K])
extends BeamOp[(K, W)] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: (K, W)] = {
Expand All @@ -199,20 +207,28 @@ object BeamOp extends Serializable {
widenPCollection(rightPCollection): PCollection[(K, _)]
)

tuple.apply(HashJoinTransform(keyCoder, joiner))
tuple.apply(name.getOrElse("HashJoin"), HashJoinTransform(keyCoder, joiner))
}

override def withName(name: String): BeamOp[(K, W)] = HashJoinOp(left, right, joiner, Some(name))
}

final case class MergedBeamOp[A](first: BeamOp[A], second: BeamOp[A], tail: Seq[BeamOp[A]])
extends BeamOp[A] {
final case class MergedBeamOp[A](
first: BeamOp[A],
second: BeamOp[A],
tail: Seq[BeamOp[A]],
name: Option[String] = None
) extends BeamOp[A] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: A] = {
val collections = PCollectionList
.of(widenPCollection(first.run(pipeline)): PCollection[A])
.and(widenPCollection(second.run(pipeline)): PCollection[A])
.and(tail.map(op => widenPCollection(op.run(pipeline)): PCollection[A]).asJava)

collections.apply(Flatten.pCollections[A]())
collections.apply(name.getOrElse("Merge"), Flatten.pCollections[A]())
}

override def withName(name: String): BeamOp[A] = MergedBeamOp(first, second, tail, Some(name))
}

final case class CoGroupedTransform[K, V](
Expand Down Expand Up @@ -241,7 +257,8 @@ object BeamOp extends Serializable {

final case class CoGroupedOp[K, V](
cg: CoGrouped[K, V],
inputOps: Seq[BeamOp[(K, Any)]]
inputOps: Seq[BeamOp[(K, Any)]],
name: Option[String] = None
)(implicit kryoCoder: KryoCoder)
extends BeamOp[(K, V)] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: (K, V)] = {
Expand All @@ -256,8 +273,10 @@ object BeamOp extends Serializable {

PCollectionList
.of(pcols.asJava)
.apply(CoGroupedTransform(joinFunction, tupleTags, keyCoder))
.apply(name.getOrElse("CoGrouped"), CoGroupedTransform(joinFunction, tupleTags, keyCoder))
}

override def withName(name: String): BeamOp[(K, V)] = CoGroupedOp(cg, inputOps, Some(name))
}

final case class CoGroupDoFn[K, V](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package com.twitter.scalding.beam_backend

import com.twitter.scalding.dagon.Rule
import com.twitter.algebird.{AveragedValue, Semigroup}
import com.twitter.scalding.Execution.ToWrite
import com.twitter.scalding.Execution.ToWrite.SimpleWrite
import com.twitter.scalding.TypedTsv
import com.twitter.scalding.beam_backend.BeamOp.{CoGroupedOp, FromIterable, HashJoinOp, MergedBeamOp}
import com.twitter.scalding.{Config, Execution, TextLine, TypedPipe}
import java.io.File
import java.nio.file.Paths
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.Pipeline.PipelineVisitor
import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior
import org.apache.beam.sdk.options.{PipelineOptions, PipelineOptionsFactory}
import org.apache.beam.sdk.runners.TransformHierarchy
import org.apache.beam.sdk.values.PValue
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.immutable
import scala.collection.mutable
import scala.io.Source

class BeamBackendTests extends FunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -473,6 +483,42 @@ class BeamBackendTests extends FunSuite with BeforeAndAfter {
assert(output.toSet == Seq((5, 3), (10, 3)).toSet)
}

test("BeamOp naming: named PTransforms") {
class TransformNameVisitor extends PipelineVisitor.Defaults {
private var transformNames: mutable.Set[String] = mutable.Set[String]()

override def visitPrimitiveTransform(node: TransformHierarchy#Node): Unit =
transformNames.add(node.getFullName)

def getTransformNames(): mutable.Set[String] = transformNames
}

case class WordCount(a: String, b: Long)

val t1 = TypedPipe.from(Seq(("a", 1L), ("b", 2L)))
val pipe = TypedPipe
.from(Seq("the quick brown fox jumps"))
.withDescription("Read data")
.flatMap(s => s.split(" "))
.withDescription("Convert to words")
.map(tag => (tag, 1L))
.sumByKey
.withDescription("Count words")
.join(t1)
.withDescription("Join with t1")
.map(keyval => WordCount(keyval._1, keyval._2._1)) ++ t1
val (pipeline, op) = beamUnoptimizedPlan(pipe)
op.run(pipeline)
val visitor = new TransformNameVisitor()
pipeline.traverseTopologically(visitor)
val names = visitor.getTransformNames()
assert(
names.exists(_.contains("Read data")) && names.exists(_.contains("Convert to words")) && names.exists(
_.contains("Join with t1")
)
)
}

private def getContents(path: String, prefix: String): List[String] =
new File(path).listFiles.flatMap { file =>
if (file.getPath.startsWith(prefix)) {
Expand Down