3535import com .google .api .gax .core .ExecutorProvider ;
3636import com .google .api .gax .core .FixedExecutorProvider ;
3737import com .google .api .gax .core .InstantiatingExecutorProvider ;
38+ import com .google .api .gax .grpc .GrpcCallContext ;
3839import com .google .api .gax .retrying .RetrySettings ;
3940import com .google .api .gax .rpc .HeaderProvider ;
4041import com .google .api .gax .rpc .NoHeaderProvider ;
5051import com .google .pubsub .v1 .PubsubMessage ;
5152import com .google .pubsub .v1 .TopicName ;
5253import com .google .pubsub .v1 .TopicNames ;
54+ import io .grpc .CallOptions ;
5355import java .io .IOException ;
5456import java .util .ArrayList ;
5557import java .util .HashMap ;
8991public class Publisher implements PublisherInterface {
9092 private static final Logger logger = Logger .getLogger (Publisher .class .getName ());
9193
94+ private static final String GZIP_COMPRESSION = "gzip" ;
95+
9296 private final String topicName ;
9397
9498 private final BatchingSettings batchingSettings ;
@@ -114,6 +118,12 @@ public class Publisher implements PublisherInterface {
114118
115119 private MessageFlowController flowController = null ;
116120
121+ private final boolean enableCompression ;
122+ private final long compressionBytesThreshold ;
123+
124+ private final GrpcCallContext publishContext ;
125+ private final GrpcCallContext publishContextWithCompression ;
126+
117127 /** The maximum number of messages in one request. Defined by the API. */
118128 public static long getApiMaxRequestElementCount () {
119129 return 1000L ;
@@ -140,6 +150,8 @@ private Publisher(Builder builder) throws IOException {
140150
141151 this .enableMessageOrdering = builder .enableMessageOrdering ;
142152 this .messageTransform = builder .messageTransform ;
153+ this .enableCompression = builder .enableCompression ;
154+ this .compressionBytesThreshold = builder .compressionBytesThreshold ;
143155
144156 messagesBatches = new HashMap <>();
145157 messagesBatchLock = new ReentrantLock ();
@@ -191,6 +203,10 @@ private Publisher(Builder builder) throws IOException {
191203 backgroundResources = new BackgroundResourceAggregation (backgroundResourceList );
192204 shutdown = new AtomicBoolean (false );
193205 messagesWaiter = new Waiter ();
206+ this .publishContext = GrpcCallContext .createDefault ();
207+ this .publishContextWithCompression =
208+ GrpcCallContext .createDefault ()
209+ .withCallOptions (CallOptions .DEFAULT .withCompression (GZIP_COMPRESSION ));
194210 }
195211
196212 /** Topic which the publisher publishes to. */
@@ -431,13 +447,18 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
431447 }
432448
433449 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
450+ GrpcCallContext context = publishContext ;
451+ if (enableCompression && outstandingBatch .batchSizeBytes >= compressionBytesThreshold ) {
452+ context = publishContextWithCompression ;
453+ }
434454 return publisherStub
435455 .publishCallable ()
436456 .futureCall (
437457 PublishRequest .newBuilder ()
438458 .setTopic (topicName )
439459 .addAllMessages (outstandingBatch .getMessages ())
440- .build ());
460+ .build (),
461+ context );
441462 }
442463
443464 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
@@ -688,6 +709,8 @@ public static final class Builder {
688709 InstantiatingExecutorProvider .newBuilder ()
689710 .setExecutorThreadCount (THREADS_PER_CPU * Runtime .getRuntime ().availableProcessors ())
690711 .build ();
712+ static final boolean DEFAULT_ENABLE_COMPRESSION = false ;
713+ static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L ;
691714
692715 String topicName ;
693716 private String endpoint = PublisherStubSettings .getDefaultEndpoint ();
@@ -717,6 +740,9 @@ public PubsubMessage apply(PubsubMessage input) {
717740 }
718741 };
719742
743+ private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION ;
744+ private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD ;
745+
720746 private Builder (String topic ) {
721747 this .topicName = Preconditions .checkNotNull (topic );
722748 }
@@ -827,6 +853,21 @@ public Builder setEndpoint(String endpoint) {
827853 return this ;
828854 }
829855
856+ /** Gives the ability to enable transport compression. */
857+ public Builder setEnableCompression (boolean enableCompression ) {
858+ this .enableCompression = enableCompression ;
859+ return this ;
860+ }
861+
862+ /**
863+ * Sets the threshold (in bytes) above which messages are compressed for transport. Only takes
864+ * effect if setEnableCompression(true) is also called."
865+ */
866+ public Builder setCompressionBytesThreshold (long compressionBytesThreshold ) {
867+ this .compressionBytesThreshold = compressionBytesThreshold ;
868+ return this ;
869+ }
870+
830871 /** Returns the default BatchingSettings used by the client if settings are not provided. */
831872 public static BatchingSettings getDefaultBatchingSettings () {
832873 return DEFAULT_BATCHING_SETTINGS ;
0 commit comments