Skip to content

Commit f3c93fa

Browse files
feat: adding support for dead letter queues (#60)
* feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. * Fix formatting * fix: making changes requested in pull request
1 parent 1b8405a commit f3c93fa

File tree

4 files changed

+34
-2
lines changed

4 files changed

+34
-2
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,17 @@ private void processBatch(List<OutstandingMessage> batch) {
341341
// This should be a blocking flow controller and never throw an exception.
342342
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
343343
}
344-
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
344+
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
345345
}
346346
}
347347

348+
private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
349+
return PubsubMessage.newBuilder(receivedMessage.getMessage())
350+
.putAttributes(
351+
"googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt()))
352+
.build();
353+
}
354+
348355
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
349356
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
350357
final AckReplyConsumer consumer =

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.google.common.base.Preconditions;
4545
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4646
import com.google.pubsub.v1.ProjectSubscriptionName;
47+
import com.google.pubsub.v1.PubsubMessage;
4748
import java.io.IOException;
4849
import java.util.ArrayList;
4950
import java.util.List;
@@ -205,6 +206,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
205206
return new Builder(subscription, receiver);
206207
}
207208

209+
/** Returns the delivery attempt count for a received {@link PubsubMessage} */
210+
public static int getDeliveryAttempt(PubsubMessage message) {
211+
return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0"));
212+
}
213+
208214
/** Subscription which the subscriber is subscribed to. */
209215
public String getSubscriptionNameString() {
210216
return subscriptionName;

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@
3737
import org.threeten.bp.Duration;
3838

3939
public class MessageDispatcherTest {
40+
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
41+
private static final int DELIVERY_INFO_COUNT = 3;
4042
private static final ReceivedMessage TEST_MESSAGE =
4143
ReceivedMessage.newBuilder()
4244
.setAckId("ackid")
43-
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
45+
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
46+
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
4447
.build();
4548
private static final Runnable NOOP_RUNNABLE =
4649
new Runnable() {
@@ -78,6 +81,9 @@ public void setUp() {
7881
new MessageReceiver() {
7982
@Override
8083
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
84+
assertThat(message.getData()).isEqualTo(MESSAGE_DATA);
85+
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
86+
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
8187
consumers.add(consumer);
8288
}
8389
};

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ public void tearDown() throws Exception {
8484
testChannel.shutdown();
8585
}
8686

87+
@Test
88+
public void testDeliveryAttemptHelper() {
89+
int deliveryAttempt = 3;
90+
PubsubMessage message =
91+
PubsubMessage.newBuilder()
92+
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
93+
.build();
94+
assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt);
95+
96+
PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
97+
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0);
98+
}
99+
87100
@Test
88101
public void testOpenedChannels() throws Exception {
89102
int expectedChannelCount = 1;

0 commit comments

Comments
 (0)