# 订阅

在 HStreamDB 中,订阅也被看作是一种持久的资源,它使得消费者和 stream 解耦。 用户可以创建、删除和列出订阅。

# 概念

HStreamDB 用订阅来管理消费的进度信息 (e.g. checkpoint, offset)。 每一个消费者开始消费前,必须加入一个已经存在的订阅

# 前提条件

确保有一个运行中并可用的 HStreamDB

# 创建一个新的订阅

通过 HStreamClient.createSubscription(Subscription) 可以创建一个新的订阅:


Subscription subscription =
    Subscription
        .newBuilder()
        .subscription("my_subscription")
        .stream("my_stream")
        .offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST))
        .ackTimeoutSeconds(600)
        .build();
client.createSubscription(subscription);

1
2
3
4
5
6
7
8
9
10
11

其中可以给 SubscriptionOffset 以下三种值:


// consume from the start of the stream
SubscriptionOffset.SpecialOffset offset = SubscriptionOffset.SpecialOffset.EARLIST;

// consume from the tail of the stream
SubscriptionOffset.SpecialOffset offset = SubscriptionOffset.SpecialOffset.LATEST;

// consume from RecordId with specified LSN and offset
RecordId rid = new RecordId(1, 2);

1
2
3
4
5
6
7
8
9
10

# 列出所有存在的订阅

通过 HStreamClient.listSubscriptions() 可以拿到所有存在的订阅列表:


List<Subscription> subscriptions = client.listSubscriptions();
for (Subscription subscription : subscriptions) {
  System.out.println(subscription);
  System.out.println(subscription.getSubscriptionId());
}

1
2
3
4
5
6
7

# 删除一个订阅

通过 HStreamClient.deleteSubscription() 可以删除一个订阅:


client.deleteSubscription("test_subscription");

1
2
3