Get Records from Shards of the Stream with Reader
What is a Reader
To allow users to retrieve data from any stream shard, HStreamDB provides readers for applications to manually manage the exact position of the record to read from. Unlike subscription and consumption, a reader can be seen as a lower-level API for getting records from streams. It gives users direct access to any records in the stream, more precisely, any records from a specific shard in the stream, and it does not require or rely on subscriptions and will not send any acknowledgement back to the server. Therefore, the reader is helpful for the case that requires better flexibility or rewinding of data reading.
When a user creates a reader instance, it is required that the user needs to specify which record and which shard the reader begins from. A reader provides starting position with the following options:
- The earliest available record in the shard
- The latest available record in the shard
- User-specified record location in the shard
Reader Example
To read from the shards, users are required to get the desired shard id with listShards
.
The name of a reader should also follow the format specified by the guidelines
package docs.code.examples;
import static io.hstream.StreamShardOffset.SpecialOffset.EARLIEST;
import io.hstream.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class ReadDataWithReaderExample {
public static void main(String[] args) throws Exception {
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName = "your_h_records_stream_name";
long shardId = 0;
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
readTheFirstRecordInShard(client, streamName, shardId);
client.close();
}
public static void readTheFirstRecordInShard(
HStreamClient client, String streamName, long shardId) {
StreamShardOffset offset = new StreamShardOffset(EARLIEST);
Reader reader =
client
.newReader()
.readerId("your_reader_id")
.streamName(streamName)
.shardId(shardId)
.shardOffset(offset)
.timeoutMs(1000)
.build();
CompletableFuture<List<ReceivedRecord>> records =
reader.read(10);
System.out.println("Read records: " + records.join());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package examples
import (
"context"
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleReadDataWithReader() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Create client error: %s", err)
}
defer client.Close()
streamName := "testDefaultStream"
readerId := "shardReader"
shards, err := client.ListShards(streamName)
if err != nil {
log.Fatalf("List shards error: %s", err)
}
shardId := shards[0].ShardId
reader, err := client.NewShardReader(streamName, readerId, shardId, hstream.WithReaderTimeout(100))
if err != nil {
log.Fatalf("Create reader error: %s", err)
}
defer client.DeleteShardReader(shardId, readerId)
defer reader.Close()
count := 0
for {
records, err := reader.Read(context.Background())
if err != nil {
log.Printf("Reader read error: %s\n", err.Error())
continue
}
for _, record := range records {
log.Printf("Reader read record [%s]:%v", record.GetRecordId().String(), record.GetPayload())
}
count += len(records)
if count >= 100 {
break
}
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import hstreamdb
import os
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def read_reader(client):
offset = hstreamdb.ShardOffset()
offset.specialOffset = hstreamdb.SpecialOffset.EARLIEST
max_records = 10
async with client.with_reader(
stream_name, "your_reader_id", offset, 1000
) as reader:
records = await reader.read(max_records)
for i, r in enumerate(records):
print(f"[{i}] payload: {r.payload}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29