Skip to content

Pubsub

package pubsub

import (
    "context"
    "fmt"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

// pubsubContainer represents the pubsub container type used in the module
type pubsubContainer struct {
    testcontainers.Container
    URI string
}

// setupPubsub creates an instance of the pubsub container type
func setupPubsub(ctx context.Context) (*pubsubContainer, error) {
    req := testcontainers.ContainerRequest{
        Image:        "gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators",
        ExposedPorts: []string{"8085/tcp"},
        WaitingFor:   wait.ForLog("started"),
        Cmd: []string{
            "/bin/sh",
            "-c",
            "gcloud beta emulators pubsub start --host-port 0.0.0.0:8085",
        },
    }
    container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    })
    if err != nil {
        return nil, err
    }

    mappedPort, err := container.MappedPort(ctx, "8085")
    if err != nil {
        return nil, err
    }

    hostIP, err := container.Host(ctx)
    if err != nil {
        return nil, err
    }

    uri := fmt.Sprintf("%s:%s", hostIP, mappedPort.Port())

    return &pubsubContainer{Container: container, URI: uri}, nil
}
package pubsub

import (
    "cloud.google.com/go/pubsub"
    "context"
    "google.golang.org/api/option"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "testing"
)

func TestPubsub(t *testing.T) {
    ctx := context.Background()

    container, err := setupPubsub(ctx)
    if err != nil {
        t.Fatal(err)
    }

    // Clean up the container after the test is complete
    t.Cleanup(func() {
        if err := container.Terminate(ctx); err != nil {
            t.Fatalf("failed to terminate container: %s", err)
        }
    })

    conn, err := grpc.Dial(container.URI, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        t.Fatal(err)
    }
    options := []option.ClientOption{option.WithGRPCConn(conn)}
    client, err := pubsub.NewClient(ctx, "my-project-id", options...)
    if err != nil {
        t.Fatal(err)
    }
    defer client.Close()

    topic, err := client.CreateTopic(ctx, "greetings")
    if err != nil {
        t.Fatal(err)
    }
    subscription, err := client.CreateSubscription(ctx, "subscription",
        pubsub.SubscriptionConfig{Topic: topic})
    if err != nil {
        t.Fatal(err)
    }
    result := topic.Publish(ctx, &pubsub.Message{Data: []byte("Hello World")})
    _, err = result.Get(ctx)
    if err != nil {
        t.Fatal(err)
    }

    // perform assertions
    var data []byte
    cctx, cancel := context.WithCancel(ctx)
    err = subscription.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
        data = m.Data
        m.Ack()
        defer cancel()
    })
    if string(data) != "Hello World" {
        t.Fatalf("Expected value %s. Got %s.", "Hello World", data)
    }
}