# Write Records to Streams

This document provides information about how to write data to streams in HStreamDB using hstreamdb-java.

You can also read following pages to get a more thorough understanding:

To write data to HStreamDB, we need to pack messages as HStream Records and a producer that creates and sends messages to servers.

# HStream Record

All data in streams are in the form of an HStream Record. There are two kinds of HStream Record:

  • HRecord: You can think of an hrecord as a piece of JSON data, just like the document in some NoSQL databases.

  • Raw Record: Arbitrary binary data.

# Write HStream Records

There are two ways to write records to servers. For simplicity, you can use Producer from client.newProducer() to start with. Producers do not provide any configure options, it simply sends records to servers as soon as possible, and all these records are sent in parallel, which means they are unordered. In practice, BufferedProducer from the client.newBufferedProducer() would always be better. BufferedProducer will buffer records in order as a batch and send the batch to servers. When a record is written to the stream, HStream Server will generate a corresponding record id for the record and send it back to the client. The record id is unique in the stream.

# Write Records Using Producer

// WriteDataSimpleExample.java

package docs.code.examples;

import io.hstream.*;
import io.hstream.Record;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class WriteDataSimpleExample {
  public static void main(String[] args) throws Exception {
    // TODO (developers): Replace these variables for your own use cases.
    String serviceUrl = "127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String streamName1 = "your_h_records_stream_name";
    String streamName2 = "your_raw_records_stream_name";

    // We do not recommend write both raw data and HRecord data into the same stream.
    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();

    writeHRecordData(client, streamName1);
    writeRawData(client, streamName2);
    client.close();
  }

  public static void writeHRecordData(HStreamClient client, String streamName) {
    // Create a basic producer for low latency scenarios
    // For high throughput scenarios, please see the next section "Using `BufferedProducer`s"
    Producer producer = client.newProducer().stream(streamName).build();

    HRecord hRecord =
        HRecord.newBuilder()
            // number
            .put("id", 10)
            // Boolean
            .put("isReady", true)
            // List
            .put("targets", HArray.newBuilder().add(1).add(2).add(3).build())
            // String
            .put("name", "h".repeat(100))
            .build();

    for (int i = 0; i <= 3000; i++) {
      Record record = Record.newBuilder().hRecord(hRecord).build();
      // If the data is written successfully, returns a server-assigned record id
      CompletableFuture<String> recordId = producer.write(record);
      System.out.println("Wrote message ID: " + recordId.join());
    }
  }

  private static void writeRawData(HStreamClient client, String streamName) {
    Producer producer = client.newProducer().stream(streamName).build();
    List<String> messages = Arrays.asList("first", "second");
    for (final String message : messages) {
      Record record =
          Record.newBuilder().rawRecord(message.getBytes(StandardCharsets.UTF_8)).build();
      CompletableFuture<String> recordId = producer.write(record);
      System.out.println("Published message ID: " + recordId.join());
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

# Write Records Using Buffered Producer

In almost all scenarios, we would recommend using BufferedProducer whenever possible because it offers higher throughput and provides a very flexible configuration that allows you to adjust between throughput and latency as needed. You can configure the following two settings of BufferedProducer to control and set the trigger and the buffer size. With BatchSetting, you can determine when to send the batch based on the maximum number of records, byte size in the batch and the maximum age of the batch. By configuring FlowControlSetting, you can set the buffer for all records. The following code example shows how you can use BatchSetting to set responding triggers to notify when the producer should flush and FlowControlSetting to limit maximum bytes in a BufferedProducer.

// WriteDataBufferedExample.java

package docs.code.examples;

import io.hstream.*;
import io.hstream.Record;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class WriteDataBufferedExample {
  public static void main(String[] args) throws Exception {
    // TODO (developers): Replace these variables for your own use cases.
    String serviceUrl = "127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String streamName = "your_h_records_stream_name";
    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    writeHRecordDataWithBufferedProducers(client, streamName);
    client.close();
  }

  public static void writeHRecordDataWithBufferedProducers(
      HStreamClient client, String streamName) {
    BatchSetting batchSetting =
        BatchSetting.newBuilder()
            // optional, default: 100, the max records count of a batch,
            // disable the trigger if the value <= 0.
            .recordCountLimit(100)

            // optional, default: 4096(4KB), the max bytes size of a batch,
            // disable the trigger if the value <= 0.
            .bytesLimit(4096)

            // optional, default: 100(ms), the max age of a buffering batch,
            // disable the trigger if the value <= 0.
            .ageLimit(100)
            .build();

    // FlowControlSetting is to control total records,
    // including buffered batch records and sending records
    FlowControlSetting flowControlSetting =
        FlowControlSetting.newBuilder()
            // Optional, the default: 104857600(100MB), total bytes limit, including buffered batch
            // records and
            // sending records, the value must be greater than batchSetting.bytesLimit
            .bytesLimit(40960)
            .build();
    BufferedProducer producer =
        client.newBufferedProducer().stream(streamName)
            .batchSetting(batchSetting)
            .flowControlSetting(flowControlSetting)
            .build();

    List<CompletableFuture<String>> recordIds = new ArrayList<>();
    Random random = new Random();

    for (int i = 0; i < 100; i++) {
      double temp = random.nextInt(100) / 10.0 + 15;
      HRecord hRecord = HRecord.newBuilder().put("temperature", temp).build();
      Record record = Record.newBuilder().hRecord(hRecord).build();
      CompletableFuture<String> recordId = producer.write(record);
      recordIds.add(recordId);
    }

    // close a producer, it will call flush() first
    producer.close();
    System.out.println("Wrote message IDs: " + recordIds.stream().map(CompletableFuture::join));
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

# Write Records with Ordering Keys

Ordering keys are optional, and if not given, the server will automatically assign a default key.Records with the same ordering key can be guaranteed to be written orderly in BufferedProducer.

Another important feature of HStreamDB, transparent sharding, uses these ordering keys to decide which shards the record will be allocated and improve write/read performance. See transparent sharding for a more detailed explanation.

You can easily write records with keys using the following example:

// WriteDataWithKeyExample.java

package docs.code.examples;

import io.hstream.*;
import io.hstream.Record;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class WriteDataWithKeyExample {
  public static void main(String[] args) throws Exception {
    // TODO (developers): Replace these variables for your own use cases.
    String serviceUrl = "127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String streamName = "your_h_records_stream_name";

    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    writeHRecordDataWithKey(client, streamName);
    client.close();
  }

  public static void writeHRecordDataWithKey(HStreamClient client, String streamName) {
    // For demonstrations, we would use the following as our ordering keys for the records.
    // As the documentations mentioned, if we do not give any ordering key, it will get a default
    // key and be mapped to some default shard.
    String key1 = "South";
    String key2 = "North";

    // Create a buffered producer with default BatchSetting and FlowControlSetting.
    BufferedProducer producer = client.newBufferedProducer().stream(streamName).build();

    List<CompletableFuture<String>> recordIds = new ArrayList<>();
    Random random = new Random();

    for (int i = 0; i < 100; i++) {
      double temp = random.nextInt(100) / 10.0 + 15;
      Record record;
      if ((i % 3) == 0) {
        HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 1).build();
        record = Record.newBuilder().hRecord(hRecord).orderingKey(key1).build();
      } else {
        HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 2).build();
        record = Record.newBuilder().hRecord(hRecord).orderingKey(key2).build();
      }

      CompletableFuture<String> recordId = producer.write(record);
      recordIds.add(recordId);
    }
    producer.close();
    System.out.println("Wrote message IDs: " + recordIds.stream().map(CompletableFuture::join));
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58