Skip to content

Apache Pulsar

Apache Pulsar is a distributed messaging and event streaming platform designed for high-throughput, low-latency data processing across multiple topics.

Add the following dependency to your project file:

NuGet
1
dotnet add package Testcontainers.Pulsar

You can start a Apache Pulsar container instance from any .NET application. Here, we create different container instances and pass them to the base test class. This allows us to test different configurations.

1
2
3
4
5
6
7
8
[UsedImplicitly]
public sealed class PulsarDefaultConfiguration : PulsarContainerTest
{
    public PulsarDefaultConfiguration()
        : base(new PulsarBuilder().Build(), false)
    {
    }
}

This example uses xUnit.net's IAsyncLifetime interface to manage the lifecycle of the container. The container is started in the InitializeAsync method before the test method runs, ensuring that the environment is ready for testing. After the test completes, the container is removed in the DisposeAsync method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Task InitializeAsync()
{
    return _pulsarContainer.StartAsync();
}

public Task DisposeAsync()
{
    return _pulsarContainer.DisposeAsync().AsTask();
}

[Fact]
public async Task ConsumerReceivesSendMessage()
{
    // Given
    const string helloPulsar = "Hello, Pulsar!";

    var topic = $"persistent://public/default/{Guid.NewGuid():D}";

    var name = Guid.NewGuid().ToString("D");

    var clientBuilder = PulsarClient.Builder().ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress()));

    if (_authenticationEnabled)
    {
        var authToken = await _pulsarContainer.CreateAuthenticationTokenAsync(Timeout.InfiniteTimeSpan);
        _ = clientBuilder.Authentication(new TokenAuthentication(authToken));
    }

    var client = clientBuilder.Build();

    await using var producer = client.NewProducer(Schema.String)
        .Topic(topic)
        .Create();

    await using var consumer = client.NewConsumer(Schema.String)
        .Topic(topic)
        .SubscriptionName(name)
        .InitialPosition(SubscriptionInitialPosition.Earliest)
        .Create();

    // When
    _ = await producer.Send(helloPulsar)
        .ConfigureAwait(true);

    var message = await consumer.Receive()
        .ConfigureAwait(true);

    // Then
    Assert.Equal(helloPulsar, Encoding.Default.GetString(message.Data));
}

The test example uses the following NuGet dependencies:

1
2
3
4
5
<PackageReference Include="Microsoft.NET.Test.Sdk"/>
<PackageReference Include="coverlet.collector"/>
<PackageReference Include="xunit.runner.visualstudio"/>
<PackageReference Include="xunit"/>
<PackageReference Include="DotPulsar"/>

To execute the tests, use the command dotnet test from a terminal.

Tip

For the complete source code of this example and additional information, please refer to our test projects.

Access Apache Pulsar

Gets the Pulsar broker URL
1
string pulsarBrokerUrl = _pulsarContainer.GetPulsarBrokerUrl();
Gets the Pulsar service URL
1
string pulsarServiceUrl = _pulsarContainer.GetHttpServiceUrl();

Enable token authentication

If you need to use token authentication, use the following builder configuration to enable authentication:

1
PulsarContainer _pulsarContainer = PulsarBuilder().WithTokenAuthentication().Build();

Start the container and obtain an authentication token with a specified expiration time

1
2
var authToken = await container.CreateAuthenticationTokenAsync(TimeSpan.FromHours(1))
    .ConfigureAwait(false);

Alternatively, set the token to never expire

1
2
var authToken = await container.CreateAuthenticationTokenAsync(Timeout.InfiniteTimeSpan)
    .ConfigureAwait(false);

Enable Pulsar Functions

If you need to use Pulsar Functions, use the following builder configuration to enable it:

1
PulsarContainer _pulsarContainer = PulsarBuilder().WithFunctions().Build();