Skip to content


Folders and files

Last commit message
Last commit date

Latest commit



59 Commits

Repository files navigation

About Pulsar

Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.

Learn more about Pulsar at

About this library

This library is a wrapper on top of pulsar-client-go It makes producing and consuming from pulsar topics very easy and abstracts all the complexities. Additionally this library provides Admin API for pulsar administration.


Import the following module

import ""

Initialize the library. This need to be done only once in the whole lifecycle of your process

worker_count := 10 //Number of consumer goroutines
pulsar_host := "pulsar.local" //pulsar host url
err := pulsarlib.InitMessaging(worker_count, pulsar_host)
if err != nil {
    panic("Error initializing the library")

Publishing messages

Create the Producer using the following code

tanantID := "my-tenant" //Name of the tenant
namespace := "my_namespace" //Name of the namespace
topic := "my_topic" //Topic to which the messages need to be published
producer, err := pulsarlib.CreateProducer(tenantID, namespace, topic)
if err != nil {
    panic("Unable to create producer")

//Two dummy messages
messages := []*pulsarlib.Message{
        Key: "message-1",
        Value: []byte("This is the first message"),
        Properties: map[string]string{
            "message-type": "string"
        Key: "message-2",
        Value: []byte("This is the second message"),
        Properties: map[string]string{
            "message-type": "string"

//Publish the messages
err = producer.Publish(messages)
if err != nil {
    panic("Error publishing messages")

stats := producer.Stats()
fmt.Println("Published messages stats:", stats)

//Cleanup the producer

Consuming messages

Create the consumer using the following code

//Create a handler. It needs to implement pulsarlib.Handler interface.
type myHandler struct{}
func (h *myHandler) HandleMessage(msg *pulsarlib.Message) {
    fmt.Printf("Message consumed. Key [%s] Value [%v] Properties [%v]", msg.Key, msg.Value, msg.Properties)

tanantID := "my-tenant" //Name of the tenant
namespace := "my_namespace" //Name of the namespace
topics := []string{"my-topic1"} //Can consume from multiple topics within a namespace
subscriptionName := "my-subscription"
handler := &myHandler{}
opts := ConsumerOpts{
    SubscriptionName: subscriptionName
consumer, err := pulsarlib.CreateConsumer(tenantID, namespace, topics, handler, opts)
if err != nil {
    panic("Error publishing messages")

//Start the consumer
err = consumer.Start()
if err != nil {
    panic("Error starting consumer")

//To pause the consumer

//To unpause the consumer

//Stopping the consumer
err = consumer.Stop()
if err != nil {
    panic("Error stopping consumer")

//If you want to delete the subscription
err = consumer.Unsubscribe()
if err != nil {
    panic("Error unsubscribing")

stats := consumer.Stats()
fmt.Println("Consumed messages stats:", stats)


The following admin APIs are provided by this library

//Available APIs
CreateTenant(tenantID string, adminRoles []string, allowedClusters []string) error
DeleteTenant(tenantID string) error
CreateNamespace(tenantID string, namespace string) error
DeleteNamespace(tenantID string, namespace string) error
UnloadNamespace(tenantID string, namespace string) error
ListTenants() ([]string, error)
ListNamespaces(tenantID string) ([]string, error)
CreateTopic(tenantID string, namespace string, topic string) error
DeleteTopic(tenantID string, namespace string, topic string) error
ListTopics(tenantID string, namespace string) ([]string, error)
CreatePartitionedTopic(tenantID string, namespace string, topic string, partitions int) error
DeletePartionedTopic(tenantID string, namespace string, topic string) error
ListPartionedTopics(tenantID string, namespace string) ([]string, error)

To use the APIs just call the function with required parameters. For example

pulsarTenants, err := pulsarlib.ListTenants()
if err != nil {
    panic("Unable to list tenants")