Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ public void shouldReadAndWriteIntegersInNetworkByteOrder() throws Exception
k3po.finish();
}

@Test
@Specification({
"timestamps.false/client",
"timestamps.false/server"
})
public void shouldConnectWithoutTimestamps() throws Exception
{
k3po.finish();
}

@Test
@Specification({
"client.sent.data/client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_SHARED_WINDOW;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_STREAM_ID;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_THROTTLE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TIMESTAMPS;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TRANSMISSION;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_UPDATE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_WINDOW;
Expand All @@ -49,11 +50,13 @@ public class DefaultZillaChannelConfig extends DefaultChannelConfig implements Z
private ZillaUpdateMode update = ZillaUpdateMode.STREAM;
private long affinity;
private byte capabilities;
private boolean timestamps;

public DefaultZillaChannelConfig()
{
super();
setBufferFactory(NATIVE_BUFFER_FACTORY);
setTimestamps(true);
}

@Override
Expand Down Expand Up @@ -189,6 +192,19 @@ public byte getCapabilities()
return capabilities;
}

@Override
public void setTimestamps(
boolean timestamps)
{
this.timestamps = timestamps;
}

@Override
public boolean hasTimestamps()
{
return timestamps;
}

@Override
protected boolean setOption0(
String key,
Expand Down Expand Up @@ -242,6 +258,10 @@ else if (OPTION_CAPABILITIES.getName().equals(key))
{
setCapabilities(convertToByte(value));
}
else if (OPTION_TIMESTAMPS.getName().equals(key))
{
setTimestamps(Boolean.parseBoolean(Objects.toString(value, "false")));
}
else
{
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_SHARED_WINDOW;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_STREAM_ID;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_THROTTLE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TIMESTAMPS;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TRANSMISSION;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_UPDATE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_WINDOW;
Expand All @@ -48,11 +49,13 @@ public class DefaultZillaServerChannelConfig extends DefaultServerChannelConfig
private ZillaUpdateMode update = ZillaUpdateMode.STREAM;
private long affinity;
private byte capabilities;
private boolean timestamps;

public DefaultZillaServerChannelConfig()
{
super();
setBufferFactory(NATIVE_BUFFER_FACTORY);
setTimestamps(true);
}

@Override
Expand Down Expand Up @@ -189,6 +192,19 @@ public byte getCapabilities()
return capabilities;
}

@Override
public void setTimestamps(
boolean timestamps)
{
this.timestamps = timestamps;
}

@Override
public boolean hasTimestamps()
{
return timestamps;
}

@Override
protected boolean setOption0(
String key,
Expand Down Expand Up @@ -239,6 +255,10 @@ else if (OPTION_CAPABILITIES.getName().equals(key))
{
setCapabilities(convertToByte(value));
}
else if (OPTION_TIMESTAMPS.getName().equals(key))
{
setTimestamps(Boolean.parseBoolean(Objects.toString(value, "false")));
}
else
{
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public String toString()
return String.format("%s [sourceId=%d, targetId=%d]", description, sourceId, targetId);
}

public long timestamp()
{
return getConfig().hasTimestamps() ? System.nanoTime() : 0L;
}

public void acknowledgeBytes(
int reserved)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_REPLY_TO;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_STREAM_ID;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_THROTTLE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TIMESTAMPS;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_TRANSMISSION;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_UPDATE;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.types.ZillaTypeSystem.OPTION_WINDOW;
Expand Down Expand Up @@ -75,7 +76,7 @@ protected ChannelAddress newChannelAddress0(

Collection<TypeInfo<?>> allOptionTypes = asList(OPTION_EPHEMERAL, OPTION_REPLY_TO, OPTION_WINDOW, OPTION_BUDGET_ID,
OPTION_STREAM_ID, OPTION_PADDING, OPTION_UPDATE, OPTION_AUTHORIZATION, OPTION_THROTTLE,
OPTION_TRANSMISSION, OPTION_BYTE_ORDER);
OPTION_TRANSMISSION, OPTION_BYTE_ORDER, OPTION_TIMESTAMPS);
for (TypeInfo<?> optionType : allOptionTypes)
{
if (options != null && options.containsKey(optionType.getName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ public interface ZillaChannelConfig extends ChannelConfig
void setCapabilities(byte capabilities);

byte getCapabilities();

void setTimestamps(boolean timestamps);

boolean hasTimestamps();
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private ZillaScope newScope(
int scopeIndex)
{
ZillaScope scope = new ZillaScope(config, labels, scopeIndex, this::lookupTargetIndex,
System::nanoTime, traceIds::incrementAndGet);
traceIds::incrementAndGet);
this.scopes = ArrayUtil.add(this.scopes, scope);
return scope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ private ZillaChildChannel doAccept(
childConfig.setPadding(serverConfig.getPadding());
childConfig.setAlignment(serverConfig.getAlignment());
childConfig.setCapabilities(serverConfig.getCapabilities());
childConfig.setTimestamps(serverConfig.hasTimestamps());

if (childConfig.getTransmission() == SIMPLEX)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public final class ZillaScope implements AutoCloseable
private final Long2ObjectHashMap<MessageHandler> throttlesById;
private final Long2ObjectHashMap<ZillaCorrelation> correlations;
private final ToIntFunction<Long> lookupTargetIndex;
private final LongSupplier supplyTimestamp;
private final LongSupplier supplyTraceId;
private final ZillaSource source;

Expand All @@ -68,7 +67,6 @@ public ZillaScope(
LabelManager labels,
int scopeIndex,
ToIntFunction<Long> lookupTargetIndex,
LongSupplier supplyTimestamp,
LongSupplier supplyTraceId)
{
this.config = config;
Expand All @@ -81,7 +79,6 @@ public ZillaScope(
this.targetsByIndex = new Int2ObjectHashMap<>();
this.debitorsByIndex = new Int2ObjectHashMap<>();
this.lookupTargetIndex = lookupTargetIndex;
this.supplyTimestamp = supplyTimestamp;
this.supplyTraceId = supplyTraceId;
this.source = new ZillaSource(config, scopeIndex, supplyTraceId,
correlations::remove, this::supplySender, this::supplyTarget,
Expand Down Expand Up @@ -365,7 +362,7 @@ private ZillaTarget newTarget(

final ZillaTarget target = new ZillaTarget(source.scopeIndex(), targetPath, layout, writeBuffer,
throttlesById::put, throttlesById::remove, correlations::put,
supplyTimestamp, supplyTraceId);
supplyTraceId);

this.targets = ArrayUtil.add(this.targets, target);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void doChallenge(
final int maximum = channel.sourceMax();

final ZillaTarget sender = supplySender.apply(streamId);
sender.doChallenge(originId, routedId, streamId, sequence, acknowledge, traceId, maximum, challengeExt);
sender.doChallenge(channel, originId, routedId, streamId, sequence, acknowledge, traceId, maximum, challengeExt);
}

public MessageHandler newStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ final class ZillaTarget implements AutoCloseable
private final LongConsumer unregisterThrottle;
private final MutableDirectBuffer writeBuffer;
private final LongObjectBiConsumer<ZillaCorrelation> correlateNew;
private final LongSupplier supplyTimestamp;
private final LongSupplier supplyTraceId;

ZillaTarget(
Expand All @@ -115,7 +114,6 @@ final class ZillaTarget implements AutoCloseable
LongObjectBiConsumer<MessageHandler> registerThrottle,
LongConsumer unregisterThrottle,
LongObjectBiConsumer<ZillaCorrelation> correlateNew,
LongSupplier supplyTimestamp,
LongSupplier supplyTraceId)
{
this.scopeIndex = scopeIndex;
Expand All @@ -127,7 +125,6 @@ final class ZillaTarget implements AutoCloseable
this.registerThrottle = registerThrottle;
this.unregisterThrottle = unregisterThrottle;
this.correlateNew = correlateNew;
this.supplyTimestamp = supplyTimestamp;
this.supplyTraceId = supplyTraceId;
}

Expand Down Expand Up @@ -270,7 +267,7 @@ public void doConnect(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(client.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(authorization)
.affinity(affinity)
Expand Down Expand Up @@ -334,14 +331,14 @@ public void operationComplete(
}

public void doConnectAbort(
ZillaClientChannel clientChannel)
ZillaClientChannel client)
{
final long originId = clientChannel.originId();
final long routedId = clientChannel.routedId();
final long initialId = clientChannel.targetId();
final long sequence = clientChannel.targetSeq();
final long acknowledge = clientChannel.targetAck();
final int maximum = clientChannel.targetMax();
final long originId = client.originId();
final long routedId = client.routedId();
final long initialId = client.targetId();
final long sequence = client.targetSeq();
final long acknowledge = client.targetAck();
final int maximum = client.targetMax();

final AbortFW abort = abortRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
Expand All @@ -350,7 +347,7 @@ public void doConnectAbort(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(client.timestamp())
.traceId(supplyTraceId.getAsLong())
.build();

Expand Down Expand Up @@ -390,7 +387,7 @@ public void doBeginReply(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.affinity(affinity)
.extension(p -> p.set(beginExtCopy))
Expand Down Expand Up @@ -495,7 +492,7 @@ private void doAdviseOutputFlush(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(authorization)
.budgetId(budgetId)
Expand Down Expand Up @@ -535,7 +532,7 @@ public void doAbortOutput(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(authorization)
.extension(p -> p.set(abortExtCopy))
Expand Down Expand Up @@ -585,7 +582,7 @@ public void doShutdownOutput(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(authorization)
.extension(p -> p.set(endExtCopy))
Expand Down Expand Up @@ -631,7 +628,7 @@ public void doClose(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(channel.targetMax())
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(channel.targetAuth())
.extension(p -> p.set(endExtCopy))
Expand Down Expand Up @@ -767,7 +764,7 @@ private boolean flushData(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.authorization(authorization)
.flags(flags)
Expand Down Expand Up @@ -839,7 +836,7 @@ void doWindow(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.budgetId(budgetId)
.padding(padding)
Expand Down Expand Up @@ -880,7 +877,6 @@ void doReset(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.traceId(traceId)
.build();

Expand Down Expand Up @@ -908,7 +904,7 @@ void doAbortInput(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(traceId)
.extension(p -> p.set(extensionCopy))
.build();
Expand All @@ -917,6 +913,7 @@ void doAbortInput(
}

void doChallenge(
final ZillaChannel channel,
final long originId,
final long routedId,
final long streamId,
Expand All @@ -935,7 +932,7 @@ void doChallenge(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(traceId)
.extension(p -> p.set(extensionCopy))
.build();
Expand Down Expand Up @@ -1128,7 +1125,7 @@ private void onHandshakeCompleted(
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.timestamp(supplyTimestamp.getAsLong())
.timestamp(channel.timestamp())
.traceId(supplyTraceId.getAsLong())
.build();

Expand Down
Loading