Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public ConnectionClientTransport newClientTransport(
if (closed) {
throw new IllegalStateException("The transport factory is closed.");
}
return new InProcessTransport(name);
return new InProcessTransport(name, authority);
}

@Override
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {

private final LogId logId = LogId.allocate(getClass().getName());
private final String name;
private final String authority;
private ServerTransportListener serverTransportListener;
private Attributes serverStreamAttributes;
private ManagedClientTransport.Listener clientTransportListener;
Expand All @@ -84,7 +85,12 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {
private Set<InProcessStream> streams = new HashSet<InProcessStream>();

public InProcessTransport(String name) {
this(name, null);
}

public InProcessTransport(String name, String authority) {
this.name = name;
this.authority = authority;
}

@CheckReturnValue
Expand Down Expand Up @@ -138,7 +144,7 @@ public void start(ClientStreamListener listener) {
}
StatsTraceContext serverStatsTraceContext = serverTransportListener.methodDetermined(
method.getFullMethodName(), headers);
return new InProcessStream(method, headers, serverStatsTraceContext).clientStream;
return new InProcessStream(method, headers, serverStatsTraceContext, authority).clientStream;
}

@Override
Expand Down Expand Up @@ -236,13 +242,15 @@ private class InProcessStream {
private final StatsTraceContext serverStatsTraceContext;
private final Metadata headers;
private final MethodDescriptor<?, ?> method;
private volatile String authority;

private InProcessStream(MethodDescriptor<?, ?> method, Metadata headers,
StatsTraceContext serverStatsTraceContext) {
StatsTraceContext serverStatsTraceContext, String authority) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.serverStatsTraceContext =
checkNotNull(serverStatsTraceContext, "serverStatsTraceContext");
this.authority = authority;
}

// Can be called multiple times due to races on both client and server closing at same time.
Expand Down Expand Up @@ -419,6 +427,11 @@ public void setDecompressor(Decompressor decompressor) {}
return serverStreamAttributes;
}

@Override
public String getAuthority() {
return InProcessStream.this.authority;
}

@Override
public StatsTraceContext statsTraceContext() {
return serverStatsTraceContext;
Expand Down Expand Up @@ -550,8 +563,7 @@ public void setMessageCompression(boolean enable) {}

@Override
public void setAuthority(String string) {
// TODO(ejona): Do something with this? Could be useful for testing, but can we "validate"
// it?
InProcessStream.this.authority = string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would not be thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InProcessStream.this.authority is now volatile. Setting and reading references are atomic in Java. Strings are immutable. This should be thread safe now, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it was fine as it was before (without volatile). setAuthority "May only be called before start."

}

@Override
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public final boolean isReady() {
return Attributes.EMPTY;
}

@Override
public String getAuthority() {
return null;
}

@Override
public final void setListener(ServerStreamListener serverStreamListener) {
transportState().setListener(serverStreamListener);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ public void runInContext() {
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName);
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public interface ServerStream extends Stream {
*/
Attributes getAttributes();

/**
* Gets the authority this stream is addressed to.
* @return the authority string.
*/
String getAuthority();

/**
* Sets the server stream listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
@RunWith(JUnit4.class)
public class InProcessTransportTest extends AbstractTransportTest {
private static final String transportName = "perfect-for-testing";
private static final String authority = "a-testing-authority";

@Override
protected InternalServer newServer() {
Expand All @@ -52,8 +53,13 @@ protected InternalServer newServer(InternalServer server) {
return newServer();
}

@Override
protected String testAuthority(InternalServer server) {
return authority;
}

@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(transportName);
return new InProcessTransport(transportName, testAuthority(server));
}
}
2 changes: 2 additions & 0 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ public ServerCall.Listener<String> startCall(
assertEquals(1, executor.runDueTasks());
ServerCall<String, Integer> call = callReference.get();
assertNotNull(call);
verify(stream).getAuthority();

String order = "Lots of pizza, please";
streamListener.messageRead(STRING_MARSHALLER.stream(order));
Expand Down Expand Up @@ -604,6 +605,7 @@ public ServerCall.Listener<String> startCall(
verifyNoMoreInteractions(stream);

assertEquals(1, executor.runDueTasks());
verify(stream).getAuthority();
verify(stream).close(same(status), notNull(Metadata.class));
verify(stream, atLeast(1)).statsTraceContext();
verifyNoMoreInteractions(stream);
Expand Down
18 changes: 17 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;

import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -96,6 +98,7 @@ class NettyServerHandler extends AbstractNettyHandler {
private Throwable connectionError;
private boolean teWarningLogged;
private WriteQueue serverWriteQueue;
private AsciiString lastKnownAuthority;

static NettyServerHandler newHandler(ServerTransportListener transportListener,
int maxStreams,
Expand Down Expand Up @@ -190,8 +193,9 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
checkNotNull(transportListener.methodDetermined(method, metadata), "statsTraceCtx");
NettyServerStream.TransportState state = new NettyServerStream.TransportState(
this, http2Stream, maxMessageSize, statsTraceCtx);
String authority = getOrUpdateAuthority((AsciiString)headers.authority());
NettyServerStream stream = new NettyServerStream(ctx.channel(), state, attributes,
statsTraceCtx);
authority, statsTraceCtx);
transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated();
http2Stream.setProperty(streamKey, state);
Expand All @@ -205,6 +209,18 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
}
}

private String getOrUpdateAuthority(AsciiString authority) {
if (authority == null) {
return null;
} else if (!authority.equals(lastKnownAuthority)) {
lastKnownAuthority = authority;
}

// AsciiString.toString() is internally cached, so subsequent calls will not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not efficient enough. If the client changes the authority every request and the server doesn't read from it, it will unnecessarily convert it. Why not do the conversion when getAuthority is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAuthority() is always called.

This is plenty efficient enough. It doesn't hinder the common case at all.

Most clients, by far, won't change the authority every request. And even if they do change constantly, then any optimization here would probably be in the noise.

I'd rather get to a point we can call good enough for now and any future optimization for edge cases gets included with the HPACK tricks we're planning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked with Carl. He showed me that getAuthority() isn't always called, since it is only called when the immutable lookup fails. But we also agreed that it will 99% of the time always be the same authority. And we also believe that the HPACK table will even make it the same AsciiString instance. So Carl is now okay with this. He would still like to have our nice HPACK stuff, but that's orthogonal and future work.

// result in recomputing the String representation of lastKnownAuthority.
return lastKnownAuthority.toString();
}

private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
flowControlPing().onDataRead(data.readableBytes(), padding);
Expand Down
9 changes: 8 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ class NettyServerStream extends AbstractServerStream {
private final Channel channel;
private final WriteQueue writeQueue;
private final Attributes attributes;
private final String authority;

public NettyServerStream(Channel channel, TransportState state, Attributes transportAttrs,
StatsTraceContext statsTraceCtx) {
String authority, StatsTraceContext statsTraceCtx) {
super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx);
this.state = checkNotNull(state, "transportState");
this.channel = checkNotNull(channel, "channel");
this.writeQueue = state.handler.getWriteQueue();
this.attributes = checkNotNull(transportAttrs);
this.authority = authority;
}

@Override
Expand All @@ -85,6 +87,11 @@ public Attributes getAttributes() {
return attributes;
}

@Override
public String getAuthority() {
return authority;
}

private class Sink implements AbstractServerStream.Sink {
@Override
public void request(final int numMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
NettyServerStream.TransportState state = new NettyServerStream.TransportState(
handler, http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx);
NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
statsTraceCtx);
"test-authority", statsTraceCtx);
stream.transportState().setListener(serverListener);
state.onStreamAllocated();
verify(serverListener, atLeastOnce()).onReady();
Expand Down
7 changes: 6 additions & 1 deletion netty/src/test/java/io/grpc/netty/NettyTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ protected InternalServer newServer(InternalServer server) {
.buildTransportServer();
}

@Override
protected String testAuthority(InternalServer server) {
return "localhost:" + server.getPort();
}

@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
int port = server.getPort();
return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port),
"localhost:" + port,
testAuthority(server),
null /* agent */);
}

Expand Down
7 changes: 6 additions & 1 deletion okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ protected InternalServer newServer(InternalServer server) {
.flowControlWindow(65 * 1024));
}

@Override
protected String testAuthority(InternalServer server) {
return "127.0.0.1:" + server.getPort();
}

@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
int port = server.getPort();
return clientFactory.newClientTransport(
new InetSocketAddress("127.0.0.1", port),
"127.0.0.1:" + port,
testAuthority(server),
null /* agent */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public abstract class AbstractTransportTest {
*/
protected abstract ManagedClientTransport newClientTransport(InternalServer server);

/**
* Returns the authority string used by a client to connect to {@code server}.
*/
protected abstract String testAuthority(InternalServer server);

/**
* When non-null, will be shut down during tearDown(). However, it _must_ have been started with
* {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following
Expand Down Expand Up @@ -648,6 +653,25 @@ public void basicStream() throws Exception {
Lists.newArrayList(metadataCaptor.getValue().getAll(binaryKey)));
}

@Test
@SuppressWarnings("deprecation")
public void authorityPropagation() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);

Metadata clientHeaders = new Metadata();
ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders);
clientStream.start(mockClientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;

assertEquals(testAuthority(server), serverStream.getAuthority());
}

@Test
public void zeroMessageStream() throws Exception {
server.start(serverListener);
Expand Down