# 创建和管理 Stream
# 命名资源准则
一个HStream资源的名称可以唯一地识别一个 HStream 资源,如一个 stream、 subscription 或 reader。 资源名称必须符合以下要求:
- 以一个字母开头
- 长度必须不超过255个字符
- 只包含以下字符。字母
[A-Za-z]
,数字[0-9]
。 破折号-
,下划线_
。
*用于资源名称作为SQL语句的一部分的情况。例如在 HStream SQL Shell 中或者用 SQL 创建 IO 任务时, 将会出现资源名称无法被正确解析的情况(如与关键词冲突),此时需要用户用反斜线 `
,括住资源名称。这个限制或将会在日后的版本中被改进移除。
# Stream 的属性
Replication factor
为了容错性和更高的可用性,每个 Stream 都可以在集群中的节点之间进行复制。一个常 用的生产环境 Replication factor 配置是为 3,也就是说,你的数据总是有三个副本, 这在出错或你想对 Server 进行维护时将会很有帮助。这种复制是以 Stream 为单位上进 行的。
Backlog Retention
该配置控制 HStreamDB 的 Stream 中的 records 被写入后保留的时间。当超过 retention 保留的时间后,HStreamDB 将会清理这些 records,不管它是否被消费过。
- 默认值=7 天
- 最小值=1 秒
- 最大值=21 天
# 创建一个 stream
在你写入 records 或者 创建一个订阅之前先创建一个 stream。
// CreateStreamExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
public class CreateStreamExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName1 = "your_h_records_stream_name";
String streamName2 = "your_raw_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
createStreamExample(client, streamName1);
createStreamWithAttrsExample(client, streamName2);
client.close();
}
public static void createStreamExample(HStreamClient client, String streamName) {
client.createStream(streamName);
}
public static void createStreamWithAttrsExample(HStreamClient client, String streamName) {
client.createStream(
streamName,
(short) 1 // replication factor
,
10 // Number of shards
,
7 * 24 * 3600 // backlog retention time in seconds
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ExampleCreateStream.go
package examples
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
)
func ExampleCreateStream() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
// Create a stream, only specific streamName
if err = client.CreateStream("testDefaultStream"); err != nil {
log.Fatalf("Creating stream error: %s", err)
}
// Create a new stream with 1 replica, 5 shards, set the data retention to 1800s.
err = client.CreateStream("testStream",
hstream.WithReplicationFactor(1),
hstream.EnableBacklog(1800),
hstream.WithShardCount(5))
if err != nil {
log.Fatalf("Creating stream error: %s", err)
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def create_stream(client):
await client.create_stream(
stream_name, replication_factor=1, backlog=24 * 60 * 60, shard_count=1
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 删除一个 Stream
只有当一个 Stream 没有所属的订阅时才允许被删除,除非传一个强制标删除的 flag 。
# 强制删除一个 Stream
如果你需要删除一个有订阅的 stream 时,请启用强制删除。在强制删除一个 stream 后, 原来 stream 的订阅仍然可以从 backlog 中读取数据。这些订阅的 stream 名字会变成 __deleted_stream__
。同时,我们并不允许在被删除的 stream 上创建新的订阅,也不允 许向该 stream 写入新的 record。
// DeleteStreamExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
public class DeleteStreamExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
// String serviceUrl = "your-service-url-address";
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName1 = "your_h_records_stream_name";
String streamName2 = "your_raw_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
deleteStreamExample(client, streamName1);
deleteStreamForceExample(client, streamName2);
client.close();
}
public static void deleteStreamExample(HStreamClient client, String streamName) {
client.deleteStream(streamName);
}
public static void deleteStreamForceExample(HStreamClient client, String streamName) {
client.deleteStream(streamName, true);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ExampleDeleteStream.go
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleDeleteStream() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
// force delete stream and ignore none exist stream
if err := client.DeleteStream("testStream",
hstream.EnableForceDelete,
hstream.EnableIgnoreNoneExist); err != nil {
log.Fatalf("Deleting stream error: %s", err)
}
if err := client.DeleteStream("testDefaultStream"); err != nil {
log.Fatalf("Deleting stream error: %s", err)
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def delete_stream(client):
await client.delete_stream(stream_name, ignore_non_exist=True, force=True)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 列出所有 stream 信息
可以如下拿到所有 HStream 中的 stream:
// ListStreamsExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Stream;
import java.util.List;
public class ListStreamsExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
listStreamExample(client);
client.close();
}
public static void listStreamExample(HStreamClient client) {
List<Stream> streams = client.listStreams();
for (Stream stream : streams) {
System.out.println(stream.getStreamName());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ExampleListStreams.go
package examples
import (
"fmt"
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleListStreams() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
streams, err := client.ListStreams()
if err != nil {
log.Fatalf("Listing streams error: %s", err)
}
for _, stream := range streams {
fmt.Printf("%+v\n", stream)
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def list_streams(client):
ss = await client.list_streams()
for s in ss:
print(s)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23