Skip to content

Commit 9c7daca

Browse files
authored
Minor bug fixes in zstream (#1364)
1 parent 760e698 commit 9c7daca

File tree

3 files changed

+32
-19
lines changed
  • incubator

3 files changed

+32
-19
lines changed

incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,13 @@ write zilla:data.ext ${pgsql:dataEx()
145145
write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
146146
" SELECT\n"
147147
" c.correlation_id,\n"
148+
" c.owner_id,\n"
149+
" c.created_at,\n"
148150
" CASE\n"
149151
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
150152
" ELSE 'PaymentDeclined'\n"
151153
" END AS event, c.user_id, c.amount\n"
152-
" FROM app_events_commands c\n"
154+
" FROM zb_catalog.app_events_commands c\n"
153155
" JOIN balance ON WHERE user_id = c.user_id\n"
154156
" WHERE c.command = 'SendPayment';"
155157
[0x00]
@@ -174,7 +176,7 @@ write zilla:data.ext ${pgsql:dataEx()
174176
.build()
175177
.build()}
176178
write "CREATE MATERIALIZED VIEW public.app_events AS\n"
177-
" SELECT * FROM send_payment_handler;"
179+
" SELECT * FROM zb_catalog.send_payment_handler;"
178180
[0x00]
179181

180182
read advised zilla:flush ${pgsql:flushEx()
@@ -248,12 +250,12 @@ write "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
248250
" SELECT\n"
249251
" COALESCE(r.status, '400') AS status,\n"
250252
" c.correlation_id\n"
251-
" FROM app_events_commands c\n"
252-
" LEFT JOIN app_events_reply_handler r\n"
253+
" FROM zb_catalog.app_events_commands c\n"
254+
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
253255
" ON c.correlation_id = r.correlation_id\n"
254256
"WITH (\n"
255257
" connector = 'kafka',\n"
256-
" topic = 'public.app_events_replies',\n"
258+
" topic = 'public.app_events_replies_sink',\n"
257259
" properties.bootstrap.server = 'localhost:9092',\n"
258260
") FORMAT PLAIN ENCODE AVRO (\n"
259261
" force_append_only='true',\n"

incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,13 @@ read zilla:data.ext ${pgsql:dataEx()
147147
read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
148148
" SELECT\n"
149149
" c.correlation_id,\n"
150+
" c.owner_id,\n"
151+
" c.created_at,\n"
150152
" CASE\n"
151153
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
152154
" ELSE 'PaymentDeclined'\n"
153155
" END AS event, c.user_id, c.amount\n"
154-
" FROM app_events_commands c\n"
156+
" FROM zb_catalog.app_events_commands c\n"
155157
" JOIN balance ON WHERE user_id = c.user_id\n"
156158
" WHERE c.command = 'SendPayment';"
157159
[0x00]
@@ -176,7 +178,7 @@ read zilla:data.ext ${pgsql:dataEx()
176178
.build()
177179
.build()}
178180
read "CREATE MATERIALIZED VIEW public.app_events AS\n"
179-
" SELECT * FROM send_payment_handler;"
181+
" SELECT * FROM zb_catalog.send_payment_handler;"
180182
[0x00]
181183

182184
write advise zilla:flush ${pgsql:flushEx()
@@ -250,12 +252,12 @@ read "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
250252
" SELECT\n"
251253
" COALESCE(r.status, '400') AS status,\n"
252254
" c.correlation_id\n"
253-
" FROM app_events_commands c\n"
254-
" LEFT JOIN app_events_reply_handler r\n"
255+
" FROM zb_catalog.app_events_commands c\n"
256+
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
255257
" ON c.correlation_id = r.correlation_id\n"
256258
"WITH (\n"
257259
" connector = 'kafka',\n"
258-
" topic = 'public.app_events_replies',\n"
260+
" topic = 'public.app_events_replies_sink',\n"
259261
" properties.bootstrap.server = 'localhost:9092',\n"
260262
") FORMAT PLAIN ENCODE AVRO (\n"
261263
" force_append_only='true',\n"

incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,9 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr
278278
CREATE MATERIALIZED VIEW %s.%s AS
279279
SELECT
280280
c.correlation_id,
281+
%s,
281282
%s
282-
FROM %s_commands c
283+
FROM %s.%s_commands c
283284
%s
284285
WHERE %s;\u0000""";
285286

@@ -316,6 +317,11 @@ public void onStarted(
316317
}
317318
}
318319

320+
String include = command.columns().stream()
321+
.filter(c -> ZILLA_MAPPINGS.containsKey(c.generatedAlways()))
322+
.map(c -> "c.%s".formatted(c.name()))
323+
.collect(Collectors.joining(",\n "));
324+
319325
String commandName = command.commandHandlers().entrySet().stream()
320326
.filter(e -> e.getValue().equals(name))
321327
.map(Map.Entry::getKey)
@@ -324,7 +330,8 @@ public void onStarted(
324330

325331
String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName);
326332

327-
String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, from, join, where);
333+
String sqlQuery = String.format(sqlFormat, systemSchema, name, include, columns,
334+
systemSchema, from, join, where);
328335

329336
handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
330337
}
@@ -370,8 +377,8 @@ public void onStarted(
370377
long authorization)
371378
{
372379
String selects = command.commandHandlers().values().stream()
373-
.map("SELECT * FROM %s"::formatted)
374-
.collect(Collectors.joining("UNION ALL\n"));
380+
.map(value -> String.format("SELECT * FROM %s.%s", systemSchema, value))
381+
.collect(Collectors.joining("\nUNION ALL\n"));
375382

376383
String sqlQuery = String.format(sqlFormat, command.schema(), command.name(), selects);
377384

@@ -534,7 +541,7 @@ public RisingwaveMacroState onError(
534541
private final class CreateReplySink implements RisingwaveMacroState
535542
{
536543
private final String sqlFormat = """
537-
CREATE SINK %s.%s_replies_sink AS
544+
CREATE SINK %s_replies_sink AS
538545
SELECT
539546
COALESCE(r.status, '400') AS status,
540547
c.correlation_id
@@ -543,7 +550,7 @@ private final class CreateReplySink implements RisingwaveMacroState
543550
ON c.correlation_id = r.correlation_id
544551
WITH (
545552
connector = 'kafka',
546-
topic = '%s.%s_replies',
553+
topic = '%s.%s_replies_sink',
547554
properties.bootstrap.server = '%s',
548555
) FORMAT PLAIN ENCODE AVRO (
549556
force_append_only='true',
@@ -555,10 +562,12 @@ public void onStarted(
555562
long traceId,
556563
long authorization)
557564
{
558-
String name = command.name();
559-
String schema = command.schema();
565+
final String name = command.name();
566+
final String schema = command.schema();
567+
568+
final String systemName = "%s.%s".formatted(systemSchema, name);
560569

561-
String sqlQuery = String.format(sqlFormat, systemSchema, name, name, name,
570+
String sqlQuery = String.format(sqlFormat, systemName, systemName, systemName,
562571
schema, name, bootstrapServer, schemaRegistry);
563572

564573
handler.doExecuteSystemClient(traceId, authorization, sqlQuery);

0 commit comments

Comments
 (0)