-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-6775: Fix the issue of without init super class's #4859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
I just double checked the code, and it seems that AbstractProcessor
is use on other places, too. Did you double check those, if super.init(context)
is called there?
It might also make sense to make AbstractProcessor#context
protected?
@@ -609,6 +609,7 @@ private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch | |||
|
|||
@Override | |||
public void init(ProcessorContext context) { | |||
super.init(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better to remove the overwrite method completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if we don't care about the context, we could just use Processor
directly instead of AbstractProcessor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestions! After saw this snippet:
kafka/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
Lines 45 to 50 in 886daf5
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() { | |
@Override | |
public void process(final String key, final Long value) { | |
context().forward(key + value, key.length() + value); | |
} | |
}; |
I wonder how about only keeping
process
this overwrite method and removing the other overwrite methods which include init
, punctuate
, and close
? I think it is much clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
@@ -648,6 +649,7 @@ private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLat | |||
return new AbstractProcessor<Integer, byte[]>() { | |||
@Override | |||
public void init(ProcessorContext context) { | |||
super.init(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
@@ -48,6 +48,7 @@ | |||
@Override | |||
public void init(final ProcessorContext context) { | |||
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId()); | |||
super.init(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better to do this as first instruction?
Thanks for the reviewing and suggestion!
If this meant other anonymous classes of
I am not sure about this since there is |
# Conflicts: # streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
LGTM! |
Some anonymous classes of AbstractProcessor didn't initialize their superclass. This will not set up ProcessorContext context at AbstractProcessor. Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
Some anonymous classes of AbstractProcessor didn't initialize their superclass. This will not set up ProcessorContext context at AbstractProcessor. Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
Some anonymous classes of
AbstractProcessor
didn't initialize their superclass. This will not set upProcessorContext context
atAbstractProcessor
.Committer Checklist (excluded from commit message)