Create and Manage Subscriptions
Attributes of a Subscription
ackTimeoutSeconds.
Specifies the max amount of time for the server to mark the record as unacknowledged, after which the record will be sent again.
maxUnackedRecords.
The maximum amount of unacknowledged records allowed. After exceeding the size set, the server will stop sending records to corresponding consumers.
Create a subscription
Every subscription has to specify which stream to subscribe to, which means you have to make sure the stream to be subscribed has already been created.
For the subscription name, please refer to the guidelines to name a resource
When creating a subscription, you can provide the attributes mentioned like this:
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Subscription;
public class CreateSubscriptionExample {
public static void main(String[] args) throws Exception {
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName = "your_h_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
createSubscriptionExample(client, streamName);
client.close();
}
public static void createSubscriptionExample(HStreamClient client, String streamName) {
String subscriptionId = "your_subscription_id";
Subscription subscription =
Subscription.newBuilder().subscription(subscriptionId).stream(streamName)
.ackTimeoutSeconds(600)
.maxUnackedRecords(10000)
.build();
client.createSubscription(subscription);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleCreateSubscription() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
streamName := "testStream"
subId0 := "SubscriptionId0"
subId1 := "SubscriptionId1"
if err = client.CreateSubscription(subId0, streamName,
hstream.WithAckTimeout(60),
hstream.WithMaxUnackedRecords(10000),
hstream.WithOffset(hstream.EARLIEST)); err != nil {
log.Fatalf("Creating subscription error: %s", err)
}
if err = client.CreateSubscription(subId1, streamName,
hstream.WithAckTimeout(600),
hstream.WithMaxUnackedRecords(5000),
hstream.WithOffset(hstream.LATEST)); err != nil {
log.Fatalf("Creating subscription error: %s", err)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
import hstreamdb
import os
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def create_subscription(client):
await client.create_subscription(
subscription,
stream_name,
ack_timeout=600,
max_unacks=10000,
offset=hstreamdb.SpecialOffset.EARLIEST,
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Delete a subscription
To delete a subscription without the force flag, you need to make sure that there is no active subscription consumer.
Delete a subscription with the force flag
If you do want to delete a subscription with running consumers, enable force deletion. While force deleting a subscription, the subscription will be in deleting state and closing running consumers, which means you will not be able to join, delete or create a subscription with the same name. After the deletion completes, you can create a subscription with the same name. However, this new subscription will be a brand new subscription. Even if they subscribe to the same stream, this new subscription will not share the consumption progress with the deleted subscription.
package docs.code.examples;
import io.hstream.HStreamClient;
public class DeleteSubscriptionExample {
public static void main(String[] args) throws Exception {
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String subscriptionId = "your_subscription_id";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
deleteSubscriptionExample(client, subscriptionId);
client.close();
}
public static void deleteSubscriptionExample(HStreamClient client, String subscriptionId) {
client.deleteSubscription(subscriptionId);
}
public static void deleteSubscriptionForceExample(HStreamClient client, String subscriptionId) {
client.deleteSubscription(subscriptionId, true);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleDeleteSubscription() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
subId0 := "SubscriptionId0"
subId1 := "SubscriptionId1"
if err = client.DeleteSubscription(subId0, true); err != nil {
log.Fatalf("Force deleting subscription error: %s", err)
}
if err = client.DeleteSubscription(subId1, false); err != nil {
log.Fatalf("Deleting subscription error: %s", err)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import hstreamdb
import os
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def delete_subscription(client):
await client.delete_subscription(subscription, force=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
List subscriptions
To list all subscriptions in HStream
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Subscription;
import java.util.List;
public class ListSubscriptionsExample {
public static void main(String[] args) throws Exception {
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
listSubscriptionExample(client);
client.close();
}
public static void listSubscriptionExample(HStreamClient client) {
List<Subscription> subscriptions = client.listSubscriptions();
for (Subscription subscription : subscriptions) {
System.out.println(subscription.getSubscriptionId());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package examples
import (
"fmt"
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleListSubscriptions() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
subscriptions, err := client.ListSubscriptions()
if err != nil {
log.Fatalf("Listing subscriptions error: %s", err)
}
for _, sub := range subscriptions {
fmt.Printf("%+v\n", sub)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import hstreamdb
import os
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def list_subscriptions(client):
subscriptions = await client.list_subscriptions()
for s in subscriptions:
print(s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23