-
Notifications
You must be signed in to change notification settings - Fork 6
Build your own SPQR Source
This tutorial will show you how to implement, annotated and deploy your own source component. To keep the example as simple as possible, you will see how to write a random number generator that pushes data into a pipeline. You can find the complete listing in our repository.
Like all component implementations you are requested to provide getter and setter methods for accessing the component identifier and its type. Additionally, each source must provide a setter for incoming message callbacks.
Each component provides two lifecycle methods that are - according to their name - invoked on component initialization and during its shutdown.
The initialize method does .. what it's name says: it may be used to initialize the component instance. It receives all key/value pairs provided to the component configuration which lives inside a pipeline definition. In this case, we parse a value from the properties that set the number of values to generate. Additionally, the caller may provide an optional seed applied to the random number generator.
public void initialize(Properties properties) throws
RequiredInputMissingException, ComponentInitializationFailedException {
this.maxNumGenerated = Integer.parseInt(StringUtils.trim(
properties.getProperty(CFG_MAX_NUM_GENERATED)));
try {
this.seed = Long.parseLong(
StringUtils.trim(properties.getProperty(CFG_SEED))
);
} catch(Exception e) {
//
}
this.content = (properties.containsKey(CFG_CONTENT) ?
properties.getProperty(CFG_CONTENT).getBytes() :
new byte[0]);
this.running = true;
}
The shutdown method is invoked by the surrounding micro pipeline. Typically this happens when the micro pipeline itself is shut down. But in case the micro pipeline tries to handle error situations on its own it may shut down and restart selected components on its own.
When the shutdown method get triggered it must ensure that all consumed resources are freed and notified about the shutdown.
In the case of our random number generator, the shutdown method looks like this:
public boolean shutdown() {
this.running = false;
return true;
}
Each source implementation requires the implementation of java.lang.Runnable#run as it get's executed in its own thread to keep things asynchronously and in parallel.
For the current case the implementation looks as follows:
public void run() {
Random rnd = null;
if(seed > 0)
rnd = new Random(seed);
else
rnd = new Random();
int count = 0;
long s1 = System.currentTimeMillis();
while(running && maxNumGenerated != 0) {
count ++;
this.callback.onMessage(new StreamingDataMessage(this.content, System.currentTimeMillis()));
maxNumGenerated--;
}
}
SPQR - stream processing and querying in realtime by Otto Group