Skip to content

Pub/Sub: messages stuck in buffer, preventing proper load balancing #17

@kir-titievsky

Description

@kir-titievsky

Repro:

  1. Publish 60 messages, with numbers 1 through 6 as message content. Observe the total backlog for a subscription reach 60 messages and 23 bytes.
  2. Start an instance of a subscriber client, with a single-threaded executor and FlowControl set to 1 message per buffer (see code below). The subscriber takes 10 second to process each message.
  3. Observe that the subscriber processes messages, one at a time, every 10 seconds (see log output below)
  4. The bug: Start two new instances of the same subscriber client, roughly a minute later. Observe: they process no messages. Expected behavior: the two subscribers immediately start processing messages.
  5. Stop the first subscriber client. Observe: the next two subscriber clients start processing messages.

The hypothesis here is that the entire backlog is stuck in the gRPC and other buffers, between the server and the client. So the server thinks the messages are out and being processed, while the client code can't really see the messages. When new clients connect, the server does not have anything to send them. Killing the original client effectively "nacks" the messages in the buffer, by sending a stream close signal to the server. This allows the server to start sending messages to the other clients.

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import org.threeten.bp.ZonedDateTime;
import org.threeten.bp.format.DateTimeFormatter;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Sub{

    private static AtomicInteger messageCounter = new AtomicInteger(0);
    private static String appInstanceId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HHmmss"));

    static class MessageHandler implements MessageReceiver {

        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println(
                        ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + ",\t"
                        + "App instance id" + appInstanceId
                        + ",\tProcessing id: " + messageCounter.incrementAndGet()
                        + ",\tmessage content:" + message.getData().toStringUtf8()
                );
                consumer.ack();
            }
            catch (InterruptedException e){
                consumer.nack();
            }
        }
    }

    /** Receive messages over a subscription. */
    public static void main(String... args) throws Exception {
        // set subscriber id, eg. my-sub
        String projectId = args[0];
        String subscriptionId = args[1];
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of( projectId, subscriptionId);
        Subscriber subscriber = null;
        try {
            // we create a single threaded subscriber with the most restrictive flow control setting:
            subscriber =
                    Subscriber.newBuilder(subscriptionName, new MessageHandler())
                            .setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1L).build())
                            .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build())
                            .build();
            subscriber.startAsync().awaitTerminated();
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }
}
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-pubsub</artifactId>
        <version>0.42.1-beta</version>
    </dependency>
Logs
# start first client at 17:00:34
17:00:47,	App instance id170034,	Processing id: 1,	message content:1
17:00:57,	App instance id170034,	Processing id: 2,	message content:5
17:01:07,	App instance id170034,	Processing id: 3,	message content:9
17:01:17,	App instance id170034,	Processing id: 4,	message content:4
17:01:27,	App instance id170034,	Processing id: 5,	message content:8
17:01:37,	App instance id170034,	Processing id: 6,	message content:3
17:01:47,	App instance id170034,	Processing id: 7,	message content:7
17:01:57,	App instance id170034,	Processing id: 8,	message content:2
17:02:07,	App instance id170034,	Processing id: 9,	message content:6
17:02:17,	App instance id170034,	Processing id: 10,	message content:13
17:02:27,	App instance id170034,	Processing id: 11,	message content:17
17:02:37,	App instance id170034,	Processing id: 12,	message content:12
17:02:47,	App instance id170034,	Processing id: 13,	message content:16
17:02:57,	App instance id170034,	Processing id: 14,	message content:11
17:03:07,	App instance id170034,	Processing id: 15,	message content:15
17:03:17,	App instance id170034,	Processing id: 16,	message content:10
17:03:27,	App instance id170034,	Processing id: 17,	message content:14
17:03:37,	App instance id170034,	Processing id: 18,	message content:22

# start second and third clients around here: they generate no logs

17:03:47,	App instance id170034,	Processing id: 19,	message content:26
17:03:57,	App instance id170034,	Processing id: 20,	message content:21
17:04:07,	App instance id170034,	Processing id: 21,	message content:25
17:04:17,	App instance id170034,	Processing id: 22,	message content:19
# kill first client

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
# second client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32

# third client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/java-pubsub API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions