Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ object OptimizationRules {
case MergedTypedPipe(a, EmptyTypedPipe) => a
case ReduceStepPipe(rs: ReduceStep[_, _, _]) if rs.mapped == EmptyTypedPipe => EmptyTypedPipe
case SumByLocalKeys(EmptyTypedPipe, _) => EmptyTypedPipe
case TrappedPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case TrappedPipe(EmptyTypedPipe, _) => EmptyTypedPipe
case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe
case WithOnComplete(EmptyTypedPipe, _) =>
EmptyTypedPipe // there is nothing to do, so we never have workers complete
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package com.twitter.scalding.beam_backend

import com.twitter.scalding.dagon.{FunctionK, Memoize, Rule}
import com.twitter.chill.KryoInstantiator
import com.twitter.chill.config.ScalaMapConfig
import com.twitter.scalding.Config
import com.twitter.scalding.beam_backend.BeamOp.{CoGroupedOp, MergedBeamOp}
import com.twitter.scalding.beam_backend.BeamOp.CoGroupedOp
import com.twitter.scalding.beam_backend.BeamOp.MergedBeamOp
import com.twitter.scalding.dagon.FunctionK
import com.twitter.scalding.dagon.Memoize
import com.twitter.scalding.dagon.Rule
import com.twitter.scalding.serialization.KryoHadoop
import com.twitter.scalding.typed.CoGrouped.WithDescription
import com.twitter.scalding.typed.OptimizationRules.AddExplicitForks
import com.twitter.scalding.typed.OptimizationRules.ComposeDescriptions
import com.twitter.scalding.typed.OptimizationRules.DeferMerge
import com.twitter.scalding.typed.OptimizationRules.DiamondToFlatMap
import com.twitter.scalding.typed.OptimizationRules.FilterKeysEarly
import com.twitter.scalding.typed.OptimizationRules.IgnoreNoOpGroup
import com.twitter.scalding.typed.OptimizationRules.MapValuesInReducers
import com.twitter.scalding.typed.OptimizationRules.RemoveDuplicateForceFork
import com.twitter.scalding.typed.OptimizationRules.RemoveUselessFork
import com.twitter.scalding.typed.OptimizationRules.composeIntoFlatMap
import com.twitter.scalding.typed.OptimizationRules.composeSame
import com.twitter.scalding.typed.OptimizationRules.simplifyEmpty
import com.twitter.scalding.typed._
import com.twitter.scalding.typed.functions.{
FilterKeysToFilter,
FlatMapValuesToFlatMap,
MapValuesToMap,
ScaldingPriorityQueueMonoid
}

import com.twitter.scalding.typed.cascading_backend.CascadingExtensions.ConfigCascadingExtensions
import com.twitter.scalding.typed.functions.FilterKeysToFilter
import com.twitter.scalding.typed.functions.FlatMapValuesToFlatMap
import com.twitter.scalding.typed.functions.MapValuesToMap
import com.twitter.scalding.typed.functions.ScaldingPriorityQueueMonoid

object BeamPlanner {
def plan(
Expand Down Expand Up @@ -61,8 +74,10 @@ object BeamPlanner {
BeamOp.Source(config, src, srcs(src))
case (IterablePipe(iterable), _) =>
BeamOp.FromIterable(iterable, kryoCoder)
case (wd: WithDescriptionTypedPipe[a], rec) =>
rec[a](wd.input)
case (wd: WithDescriptionTypedPipe[_], rec) => {
val op = rec(wd.input)
op.withName(wd.descriptions.map(_._1).head)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.head can throw. Can we instead do:

wd.descriptions match {
  case head :: _ =>
    op.withName(head._1)
  case Nil =>
    op
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I thought that for a WithDescriptionTypedPipe this could not be empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it maybe can't but we didn't make that clear in the types... :/

}
case (SumByLocalKeys(pipe, sg), rec) =>
val op = rec(pipe)
config.getMapSideAggregationThreshold match {
Expand Down Expand Up @@ -97,7 +112,10 @@ object BeamPlanner {
uir.evidence.subst[BeamOpT](sortedOp)
}
go(ivsr)
case (ReduceStepPipe(ValueSortedReduce(keyOrdering, pipe, valueSort, reduceFn, _, _)), rec) =>
case (
ReduceStepPipe(ValueSortedReduce(keyOrdering, pipe, valueSort, reduceFn, _, _)),
rec
) =>
val op = rec(pipe)
op.sortedMapGroup(reduceFn)(keyOrdering, valueSort, kryoCoder)
case (ReduceStepPipe(IteratorMappedReduce(keyOrdering, pipe, reduceFn, _, _)), rec) =>
Expand All @@ -116,7 +134,7 @@ object BeamPlanner {
val ops: Seq[BeamOp[(K, Any)]] = cg.inputs.map(tp => rec(tp))
CoGroupedOp(cg, ops)
}
go(cg)
if (cg.descriptions.isEmpty) go(cg) else go(cg).withName(cg.descriptions.last)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using last here but head above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to ask you about this myself. If you look at the test case in this PR, The cogrouped expression has two descriptions "Count words" and "Join with t1", both of which appear in the descriptions of CoGrouped. But since this appears to be appending descriptions for eg. in CoGrouped.Pair and the optimization rule ComposeDescriptions, I chose to keep the last one. Hence no assertion for "Count words" even in that test case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not combine all of them? Why not just cg.descriptions.mkString("\n") or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me try this out on a job first. Thanks!

case (Fork(input), rec) =>
rec(input)
case (m @ MergedTypedPipe(_, _), rec) =>
Expand All @@ -137,7 +155,21 @@ object BeamPlanner {

def defaultOptimizationRules(config: Config): Seq[Rule[TypedPipe]] = {
def std(forceHash: Rule[TypedPipe]) =
OptimizationRules.standardMapReduceRules :::
List(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change here? Is this copying the same cascading optimizations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really is the standardMapReduceRules with DescribeLater excluded. I've mentioned in the summary of this PR that I chose to exclude that rule in order to capture the line numbers at the right points, without the optimizer changing the AST. I'm curious if there's a better way to do this though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed that. Note: the line numbers are already captured by the time that rule runs. The line numbers are collected on the TypedPipe. I don't see how removing that rule helps descriptions.

If I were you, I would look at all the descriptions and add the full list.

The problem with removing that rule is that it will block merging nodes together. It may be fine, maybe Beam will follow up with optimizations, but I would be careful: scalding may do some optimizations that beam doesn't.

Copy link
Contributor Author

@navinvishy navinvishy Apr 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line numbers are in the descriptions though, if users don't explicitly add them with a call to withDescription.
Here's what I'm trying to do: if we have a TypedPipe that looks something like this:
WithDescription(Mapped(WithDescription(FlatMapped(), "a")), "b")
I'm trying to associate the description "a" with the PTransform corresponding to the FlatMapped and the description "b" with the PTransform corresponding to the Mapped. By default, the "a" and "b" here are line numbers unless the user explicitly added their own descriptions.
If I let DescribeLater run, I think we'll end up with something like this:
WithDescription(FlatMapped(), ["a","b"])
(since this would also compose the Mapped and FlatMapped, and then also run ComposeDescriptions)
This would be hard to debug, when there is an error in this stage in the job since it's hard to map back to the code.
But as I write this, I'm thinking if its probably better to just concatenate the line numbers instead...
Dataflow (which is the Beam runner we're using) does apply similar optimizations (I've mentioned this in the summary), but I'm debating now if its a good idea to change what the optimizer does, since it definitely makes the optimizer less useful. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know the answer. So, line numbers are added to descriptions when the user constructs the TypedPipe, see:

private[scalding] def withLine: TypedPipe[T] =

The idea of DescribeLater is to try to merge as many operations into one step and then put all the line numbers/descriptions on that single step. By taking just a single item you are hiding which actions might be combined.

As to what to do, I don't know. If beam's optimizer is very good, it maybe doesn't matter. Maybe you should try to separate runs with a somewhat complex job and compare?

Also, I can imagine a Config setting like scalding.preserve_description_order that removes applying that optimization. Then if you were debugging, you could set that to true, but for a production job you could set it to false.

I would probably bias to just combining all the descriptions into a single beam description unless you actually see problems. You can always come back and add that setting. I would personally bias to maximizing the utility of the optimizer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me try out some jobs with the rule enabled and showing all the descriptions.

// phase 0, add explicit forks to not duplicate pipes on fanout below
AddExplicitForks,
RemoveUselessFork,
// phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc...
IgnoreNoOpGroup.orElse(composeSame).orElse(FilterKeysEarly).orElse(DeferMerge),
// phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges
composeIntoFlatMap
.orElse(simplifyEmpty)
.orElse(DiamondToFlatMap)
.orElse(ComposeDescriptions)
.orElse(MapValuesInReducers),
// phase 3, remove duplicates forces/forks (e.g. .fork.fork or .forceToDisk.fork, ....)
RemoveDuplicateForceFork
) :::
List(
OptimizationRules.FilterLocally, // after filtering, we may have filtered to nothing, lets see
OptimizationRules.simplifyEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import com.twitter.scalding.Config
import com.twitter.scalding.beam_backend.BeamFunctions._
import com.twitter.scalding.beam_backend.BeamJoiner.MultiJoinFunction
import com.twitter.scalding.serialization.Externalizer
import com.twitter.scalding.typed.Input
import com.twitter.scalding.typed.functions.ComposedFunctions.ComposedMapGroup
import com.twitter.scalding.typed.functions.{EmptyGuard, MapValueStream, ScaldingPriorityQueueMonoid, SumAll}
import com.twitter.scalding.typed.{CoGrouped, Input}
import com.twitter.scalding.typed.{CoGrouped, TypedSource}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change back from Input?

(I think I know the answer: you all are actually using a fork... I really hope you will get onto the mainline branch. I am contributing my time to help you, I hope you will contribute your energy to help others by actually using the open source version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't really change it back...this was just a missed import, but you're right that we're using a fork. Thank you for investing your time in this, it really is much appreciated! I have it on my todo list to get us on the mainline branch. I do need to go through your earlier refactor more closely though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. I changed this code to using Input which is a subclass and replacement for TypedSource. It seems the diff changes it back from Input to TypedSource. Can you restate what you meant that this is a "missed import"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear. This change is now outdated - I updated it to remove the reference to TypedSource. The reason it was there in the first place was because I brought it over from our fork, but I hadn't removed unused imports, which is why it showed up. You should see no reference to TypedSource now in the diff.

import java.util.{Comparator, PriorityQueue}
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.{Coder, IterableCoder, KvCoder}
Expand Down Expand Up @@ -55,6 +56,8 @@ sealed abstract class BeamOp[+A] {

def flatMap[B](f: A => TraversableOnce[B])(implicit kryoCoder: KryoCoder): BeamOp[B] =
parDo(FlatMapFn(f), "flatMap")

def withName(name: String): BeamOp[A]
}

private final case class SerializableComparator[T](comp: Comparator[T]) extends Comparator[T] {
Expand Down Expand Up @@ -136,11 +139,16 @@ object BeamOp extends Serializable {
)
case Some(src) => src.read(pipeline, conf)
}

override def withName(name: String): BeamOp[A] = this
}

final case class FromIterable[A](iterable: Iterable[A], kryoCoder: KryoCoder) extends BeamOp[A] {
final case class FromIterable[A](iterable: Iterable[A], kryoCoder: KryoCoder, name: Option[String] = None)
extends BeamOp[A] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: A] =
pipeline.apply(Create.of(iterable.asJava).withCoder(kryoCoder))
pipeline.apply(name.getOrElse("Iterable source"), Create.of(iterable.asJava).withCoder(kryoCoder))

override def withName(name: String): BeamOp[A] = FromIterable(iterable, kryoCoder, Some(name))
}

final case class TransformBeamOp[A, B](
Expand All @@ -153,6 +161,8 @@ object BeamOp extends Serializable {
val pCollection: PCollection[A] = widenPCollection(source.run(pipeline))
pCollection.apply(name, f).setCoder(kryoCoder)
}

override def withName(desc: String): BeamOp[B] = TransformBeamOp(source, f, kryoCoder, desc)
}

final case class HashJoinTransform[K, V, U, W](
Expand Down Expand Up @@ -184,7 +194,8 @@ object BeamOp extends Serializable {
final case class HashJoinOp[K, V, U, W](
left: BeamOp[(K, V)],
right: BeamOp[(K, U)],
joiner: (K, V, Iterable[U]) => Iterator[W]
joiner: (K, V, Iterable[U]) => Iterator[W],
name: Option[String] = None
)(implicit kryoCoder: KryoCoder, ordK: Ordering[K])
extends BeamOp[(K, W)] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: (K, W)] = {
Expand All @@ -199,20 +210,28 @@ object BeamOp extends Serializable {
widenPCollection(rightPCollection): PCollection[(K, _)]
)

tuple.apply(HashJoinTransform(keyCoder, joiner))
tuple.apply(name.getOrElse("HashJoin"), HashJoinTransform(keyCoder, joiner))
}

override def withName(name: String): BeamOp[(K, W)] = HashJoinOp(left, right, joiner, Some(name))
}

final case class MergedBeamOp[A](first: BeamOp[A], second: BeamOp[A], tail: Seq[BeamOp[A]])
extends BeamOp[A] {
final case class MergedBeamOp[A](
first: BeamOp[A],
second: BeamOp[A],
tail: Seq[BeamOp[A]],
name: Option[String] = None
) extends BeamOp[A] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: A] = {
val collections = PCollectionList
.of(widenPCollection(first.run(pipeline)): PCollection[A])
.and(widenPCollection(second.run(pipeline)): PCollection[A])
.and(tail.map(op => widenPCollection(op.run(pipeline)): PCollection[A]).asJava)

collections.apply(Flatten.pCollections[A]())
collections.apply(name.getOrElse("Merge"), Flatten.pCollections[A]())
}

override def withName(name: String): BeamOp[A] = MergedBeamOp(first, second, tail, Some(name))
}

final case class CoGroupedTransform[K, V](
Expand Down Expand Up @@ -241,7 +260,8 @@ object BeamOp extends Serializable {

final case class CoGroupedOp[K, V](
cg: CoGrouped[K, V],
inputOps: Seq[BeamOp[(K, Any)]]
inputOps: Seq[BeamOp[(K, Any)]],
name: Option[String] = None
)(implicit kryoCoder: KryoCoder)
extends BeamOp[(K, V)] {
override def runNoCache(pipeline: Pipeline): PCollection[_ <: (K, V)] = {
Expand All @@ -256,8 +276,10 @@ object BeamOp extends Serializable {

PCollectionList
.of(pcols.asJava)
.apply(CoGroupedTransform(joinFunction, tupleTags, keyCoder))
.apply(name.getOrElse("CoGrouped"), CoGroupedTransform(joinFunction, tupleTags, keyCoder))
}

override def withName(name: String): BeamOp[(K, V)] = CoGroupedOp(cg, inputOps, Some(name))
}

final case class CoGroupDoFn[K, V](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package com.twitter.scalding.beam_backend

import com.twitter.scalding.dagon.Rule
import com.twitter.algebird.{AveragedValue, Semigroup}
import com.twitter.scalding.Execution.ToWrite
import com.twitter.scalding.Execution.ToWrite.SimpleWrite
import com.twitter.scalding.TypedTsv
import com.twitter.scalding.beam_backend.BeamOp.{CoGroupedOp, FromIterable, HashJoinOp, MergedBeamOp}
import com.twitter.scalding.{Config, Execution, TextLine, TypedPipe}
import java.io.File
import java.nio.file.Paths
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.Pipeline.PipelineVisitor
import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior
import org.apache.beam.sdk.options.{PipelineOptions, PipelineOptionsFactory}
import org.apache.beam.sdk.runners.TransformHierarchy
import org.apache.beam.sdk.values.PValue
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.immutable
import scala.collection.mutable
import scala.io.Source

class BeamBackendTests extends FunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -473,6 +483,42 @@ class BeamBackendTests extends FunSuite with BeforeAndAfter {
assert(output.toSet == Seq((5, 3), (10, 3)).toSet)
}

test("BeamOp naming: named PTransforms") {
class TransformNameVisitor extends PipelineVisitor.Defaults {
private var transformNames: mutable.Set[String] = mutable.Set[String]()

override def visitPrimitiveTransform(node: TransformHierarchy#Node): Unit =
transformNames.add(node.getFullName)

def getTransformNames(): mutable.Set[String] = transformNames
}

case class WordCount(a: String, b: Long)

val t1 = TypedPipe.from(Seq(("a", 1L), ("b", 2L)))
val pipe = TypedPipe
.from(Seq("the quick brown fox jumps"))
.withDescription("Read data")
.flatMap(s => s.split(" "))
.withDescription("Convert to words")
.map(tag => (tag, 1L))
.sumByKey
.withDescription("Count words")
.join(t1)
.withDescription("Join with t1")
.map(keyval => WordCount(keyval._1, keyval._2._1)) ++ t1
val (pipeline, op) = beamUnoptimizedPlan(pipe)
op.run(pipeline)
val visitor = new TransformNameVisitor()
pipeline.traverseTopologically(visitor)
val names = visitor.getTransformNames()
assert(
names.exists(_.contains("Read data")) && names.exists(_.contains("Convert to words")) && names.exists(
_.contains("Join with t1")
)
)
}

private def getContents(path: String, prefix: String): List[String] =
new File(path).listFiles.flatMap { file =>
if (file.getPath.startsWith(prefix)) {
Expand Down