# 数据消费 (Data Consumption)

如何通过 Java SDK 消费 HStreamDB 中的数据

# 前提条件

确保有一个运行中并可用的 HStreamDB

# 概念

客户端可以从一个消费者对象中消费数据。 每一个消费者对象将通过 subscriptionID 来加入一个持有订阅的共享订阅组 (consumer group), 然后客户端可以从订阅的流中获取数据。

每个消费者对象包含一个 RawRecordReceiver 和一个 HRecordReceiver 。 由此,用户可以根据他们的需求来消费原始数据或 HRecords 。

当客户端收到一条 record 时,应当使用 responder.ack() 来返回一个 ack。HStreamDB 当前支 持的是at-least-once 消费语义,因此,若不返回 ack,经过 ackTimeoutSecond 后,这些 records 会被 server 重新发送。

# 消费 Records


// first, create a subscription for the stream
Subscription subscription =
    Subscription
        .newBuilder()
        .subscription("my_subscription")
        .stream("my_stream")
        .offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST))
        .ackTimeoutSeconds(600)
        .build();
client.createSubscription(subscription);

// second, create a consumer attach to the subscription
Consumer consumer =
    client
        .newConsumer()
        .subscription("my_subscription")
        .rawRecordReceiver(
            ((receivedRawRecord, responder) -> {
                System.out.println(receivedRawRecord.getRecordId());
                responder.ack();
            }))
        .build();

// third, start the consumer
consumer.startAsync().awaitRunning();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  • 示例中用了 rawRecordReceiver() 来消费原始数据,假如想要消费 HRecord,可以用 hRecordReceiver()