-
Couldn't load subscription status.
- Fork 536
Description
Checklist
- [ x] I have included information about relevant versions
- [ x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
As soon as I edit the provided window example (https://github.com/robinhood/faust/blob/master/examples/windowed_aggregation.py) to use rocksdb the behavior suddenly changes. The value-function does always return an empty list.
(The code is more or less the same as. I just shortened the example for brevity)
from datetime import datetime, timedelta
from time import time
import random
import faust
class RawModel(faust.Record):
date: datetime
value: float
TOPIC = 'raw-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 10
PARTITIONS = 1
app = faust.App('windowed-agg', broker=KAFKA, version=1, topic_partitions=1,
store='rocksdb://')
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)
def window_processor(key, events):
print(f'window_processor - events: {len(events)}')
tumbling_table = (
app.Table(
TABLE,
default=list,
partitions=PARTITIONS,
on_window_close=window_processor,
)
.tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
.relative_to_field(RawModel.date)
)
@app.agent(source)
async def print_windowed_events(stream):
async for event in stream:
value_list = tumbling_table['events'].value()
print(f'print_windowed_events before: {value_list}')
print(event)
value_list.append(event)
tumbling_table['events'] = value_list
value_list = tumbling_table['events'].value()
print(f'print_windowed_events after: {value_list}')
@app.timer(0.1)
async def produce():
await source.send(value=RawModel(value=random.random(), date=int(time())))
if __name__ == '__main__':
app.main()Expected behavior
I would expect the output like when using the in memory store:
[2020-01-10 14:52:49,442] [10873] [WARNING] print_windowed_events before: []
[2020-01-10 14:52:49,442] [10873] [WARNING] <RawModel: date=1578664368, value=0.3634263945834183>
[2020-01-10 14:52:49,443] [10873] [WARNING] print_windowed_events after: [<RawModel: date=1578664368, value=0.3634263945834183>]
[2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events before: [<RawModel: date=1578664368, value=0.3634263945834183>]
[2020-01-10 14:52:49,943] [10873] [WARNING] <RawModel: date=1578664369, value=0.4364065026849575>
[2020-01-10 14:52:49,943] [10873] [WARNING] print_windowed_events after: [<RawModel: date=1578664368, value=0.3634263945834183>, <RawModel: date=1578664369, value=0.4364065026849575>]
Actual behavior
The value method does always return an empty list even when I just added an element. Therefore the window_processor always gets only one event instead of all events in the window.
[2020-01-10 14:51:52,332] [10833] [WARNING] print_windowed_events before: []
[2020-01-10 14:51:52,333] [10833] [WARNING] <RawModel: date=1578664311, value=0.1627785852779441>
[2020-01-10 14:51:52,333] [10833] [WARNING] print_windowed_events after: []
[2020-01-10 14:51:52,849] [10833] [WARNING] print_windowed_events before: []
[2020-01-10 14:51:52,849] [10833] [WARNING] <RawModel: date=1578664312, value=0.5614135995691765>
[2020-01-10 14:51:52,850] [10833] [WARNING] print_windowed_events after: [] Let me know if I miss something. I also checked python-rocksdb separately and it seems to work.
Versions
- Python version: 3.7.4
- Faust version: 1.9.0
- Operating system: macOS Catalina (10.15.2)
- Kafka version Confluent 5.3.0-ccs (Commit:a8eb7a79910d0f1a)
- RocksDB version (if applicable): python-rocksdb (0.7.0)
Comment
Thanks a lot for any suggestions on this!