Skip to content

Streaming RPC

protoc-gen-nats-micro supports all three protobuf streaming patterns over NATS. Streaming methods are automatically detected from your proto definitions — no extra annotations needed.

Streaming Patterns

PatternProto SyntaxDescription
Server-streamingrpc Foo(Req) returns (stream Resp)Client sends one request, server sends many responses
Client-streamingrpc Foo(stream Req) returns (Resp)Client sends many requests, server responds once
Bidirectionalrpc Foo(stream Req) returns (stream Resp)Both sides send and receive concurrently

Proto Definition

protobuf
service StreamDemoService {
  option (natsmicro.service) = {
    subject_prefix: "api.v1.stream"
  };

  // Standard unary — business as usual
  rpc Ping(PingRequest) returns (PingResponse) {}

  // Server-streaming — server sends N responses
  rpc CountUp(CountUpRequest) returns (stream CountUpResponse) {}

  // Client-streaming — client sends N requests
  rpc Sum(stream SumRequest) returns (SumResponse) {}

  // Bidi — both sides stream concurrently
  rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}

Wire Protocol

Streaming uses NATS pub/sub with custom headers for flow control. No JetStream required.

HeaderDirectionPurpose
Reply-ToClient → ServerClient's inbox for receiving streamed messages
Nats-Stream-InboxServer → ClientServer's inbox (for client-streaming and bidi)
Nats-Stream-SeqServer → ClientSequence number for ordered delivery
Nats-Stream-EndServer → Client"true" signals end-of-stream
Status / DescriptionServer → ClientError info on the end-of-stream message

Server Implementation

Server-Streaming

The handler receives a typed stream with a Send() method:

go
func (s *myService) CountUp(ctx context.Context, req *CountUpRequest, stream *StreamDemoService_CountUp_Stream) error {
    for i := int32(0); i < req.Count; i++ {
        if err := stream.Send(&CountUpResponse{
            Number: req.Start + i,
        }); err != nil {
            return err
        }
    }
    return nil // stream automatically closed on return
}

Client-Streaming

The handler receives a stream with Recv() and returns a final response:

go
func (s *myService) Sum(ctx context.Context, stream *StreamDemoService_Sum_Stream) (*SumResponse, error) {
    var total int64
    var count int32
    for {
        msg, err := stream.Recv(ctx)
        if err != nil {
            break // EOF or stream ended
        }
        total += msg.Value
        count++
    }
    return &SumResponse{Total: total, Count: count}, nil
}

Bidirectional

The handler gets a stream with both Send() and Recv():

go
func (s *myService) Chat(ctx context.Context, stream *StreamDemoService_Chat_Stream) error {
    for {
        msg, err := stream.Recv(ctx)
        if err != nil {
            break
        }
        stream.Send(&ChatMessage{
            User: "server",
            Text: "echo: " + msg.Text,
        })
    }
    return nil
}

Client Usage

Server-Streaming

go
stream, err := client.CountUp(ctx, &CountUpRequest{Start: 1, Count: 5})
if err != nil { /* handle */ }

for {
    resp, err := stream.Recv(ctx)
    if err != nil { break } // EOF
    fmt.Println(resp.Number)
}
stream.Close()

Client-Streaming

go
stream, err := client.Sum(ctx)
if err != nil { /* handle */ }

stream.Send(&SumRequest{Value: 10})
stream.Send(&SumRequest{Value: 20})
stream.Send(&SumRequest{Value: 30})

result, err := stream.CloseAndRecv(ctx)
fmt.Println(result.Total) // 60

Bidirectional

go
stream, err := client.Chat(ctx)
if err != nil { /* handle */ }

stream.Send(&ChatMessage{User: "me", Text: "hello"})
reply, _ := stream.Recv(ctx) // "echo: hello"

stream.Send(&ChatMessage{User: "me", Text: "bye"})
reply, _ = stream.Recv(ctx)  // "echo: bye"

stream.CloseSend()

Chunked Blob Helpers

For simple blob transfer over streaming RPC, annotate the method with chunked_io and use a single-field bytes chunk message:

protobuf
message SnapshotChunk {
  bytes data = 1;
}

rpc ExportSnapshot(ExportSnapshotRequest) returns (stream SnapshotChunk) {
  option (natsmicro.chunked_io) = {};
}

rpc ImportSnapshot(stream SnapshotChunk) returns (ImportSnapshotResponse) {
  option (natsmicro.chunked_io) = {
    default_chunk_size: 131072
  };
}

Constraints:

  • Bidirectional methods are intentionally rejected.
  • Chunk messages must stay simple: exactly one bytes field, with metadata kept in the request or final response.

Generated Helpers by Language

Go — full download and upload helpers:

  • Download: RecvBytes(ctx), RecvToWriter(ctx, w), RecvToFile(ctx, path)RecvToFile writes atomically (temp file + rename); no partial file is left on error.
  • Upload: SendBytes(data), SendReader(r, chunkSize), SendFile(path, chunkSize) — upload helpers are stream-first; on error some chunks may have already been transmitted.

TypeScript — download and upload helpers available:

  • Download: recvBytes() — drains the stream into a single Uint8Array.
  • Upload: sendBytes(data: Uint8Array) — wraps raw bytes into the chunk message and sends it via the client-streaming sender.

Python — download and upload helpers:

  • Download: recv_bytes() — drains the stream into a single bytes object.
  • Upload: send_bytes(data: bytes) — wraps raw bytes into the chunk message and sends it via the client-streaming sender.

Examples

Go:

go
download, err := client.ExportSnapshot(ctx, &ExportSnapshotRequest{Id: "snap-1"})
if err != nil { /* handle */ }
if err := download.RecvToFile(ctx, "/tmp/snapshot.bin"); err != nil { /* handle */ }

upload, err := client.ImportSnapshot(ctx)
if err != nil { /* handle */ }
if err := upload.SendFile("/tmp/snapshot.bin", 0); err != nil { /* handle */ }
resp, err := upload.CloseAndRecv(ctx)
_ = resp

TypeScript (download):

typescript
const stream = await client.exportSnapshot(new ExportSnapshotRequest({ id: 'snap-1' }));
const data: Uint8Array = await stream.recvBytes();

TypeScript (upload):

typescript
const sender = await client.importSnapshot();
sender.sendBytes(chunk1);
sender.sendBytes(chunk2);
const response = await sender.closeAndRecv();

Python:

python
stream = await client.export_snapshot(ExportSnapshotRequest(id="snap-1"))
data: bytes = await stream.recv_bytes()

Stream Types Reference

Server-Side Stream (Send-only)

MethodDescription
Send(msg) errorSend a typed message to the client
Close() errorSend end-of-stream marker
CloseWithError(code, msg) errorSend error + end-of-stream

Client-Side Stream (Recv-only)

MethodDescription
Recv(ctx) (*T, error)Block until next message or EOF
Close() errorUnsubscribe from stream

Bidi Stream

MethodDescription
Send(msg) errorSend to the other side
Recv(ctx) (*T, error)Receive from the other side
CloseSend() errorSignal end of sending
CloseRecv() errorUnsubscribe from receiving

Language Support

FeatureGoTypeScriptPython
Server-streaming (service)
Server-streaming (client)
Client-streaming
Bidi-streaming
Chunked I/O (download)
Chunked I/O (upload)

TIP

Check out the streaming-go example for a complete working demo of all four RPC patterns.

Released under the MIT License.