packagepulsarimport("context""fmt""io""github.com/testcontainers/testcontainers-go""github.com/testcontainers/testcontainers-go/wait")typepulsarContainerstruct{testcontainers.ContainerURIstring}funcsetupPulsar(ctxcontext.Context)(*pulsarContainer,error){matchAdminResponse:=func(rio.Reader)bool{respBytes,_:=io.ReadAll(r)resp:=string(respBytes)returnresp==`["standalone"]`}pulsarRequest:=testcontainers.ContainerRequest{Image:"docker.io/apachepulsar/pulsar:2.10.2",ExposedPorts:[]string{"6650/tcp","8080/tcp"},WaitingFor:wait.ForAll(wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse),wait.ForLog("Successfully updated the policies on namespace public/default"),),Cmd:[]string{"/bin/bash","-c","/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss",},}c,err:=testcontainers.GenericContainer(ctx,testcontainers.GenericContainerRequest{ContainerRequest:pulsarRequest,Started:true,})iferr!=nil{returnnil,err}c.StartLogProducer(ctx)deferc.StopLogProducer()lc:=logConsumer{}c.FollowOutput(&lc)pulsarPort,err:=c.MappedPort(ctx,"6650/tcp")iferr!=nil{returnnil,err}return&pulsarContainer{Container:c,URI:fmt.Sprintf("pulsar://127.0.0.1:%v",pulsarPort.Int()),},nil}typelogConsumerstruct{}func(lc*logConsumer)Accept(ltestcontainers.Log){fmt.Print(string(l.Content))}
packagepulsarimport("context""fmt""testing""time""github.com/apache/pulsar-client-go/pulsar")funcTestPulsar(t*testing.T){ctx,cancel:=context.WithCancel(context.Background())defercancel()c,err:=setupPulsar(ctx)iferr!=nil{t.Fatal(err)}pc,err:=pulsar.NewClient(pulsar.ClientOptions{URL:c.URI,OperationTimeout:30*time.Second,ConnectionTimeout:30*time.Second,})iferr!=nil{t.Fatal(err)}t.Cleanup(func(){pc.Close()})consumer,err:=pc.Subscribe(pulsar.ConsumerOptions{Topic:"test-topic",SubscriptionName:"pulsar-test",Type:pulsar.Exclusive,})iferr!=nil{t.Fatal(err)}t.Cleanup(func(){consumer.Close()})msgChan:=make(chan[]byte)gofunc(){msg,err:=consumer.Receive(ctx)iferr!=nil{fmt.Println("failed to receive message",err)return}msgChan<-msg.Payload()consumer.Ack(msg)}()producer,err:=pc.CreateProducer(pulsar.ProducerOptions{Topic:"test-topic",})iferr!=nil{t.Fatal(err)}producer.Send(ctx,&pulsar.ProducerMessage{Payload:[]byte("hello world"),})ticker:=time.NewTicker(1*time.Minute)select{case<-ticker.C:t.Fatal("did not receive message in time")casemsg:=<-msgChan:ifstring(msg)!="hello world"{t.Fatal("received unexpected message bytes")}}}