Rabbitmq和Kafka简单的性能测试

发表于:2016-3-09 11:50

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:时萌    来源:51Testing软件测试网采编

  测试环境:ubuntu 15.10 64位
  cpu:inter core i7-4790 3.60GHZ * 8
  内存:16GB
  硬盘:ssd 120GB
  软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)
  PS: 测试结果均为单操作测试,即生产的时候没有消费操作
  测试结果:
  kafka :消费速度: 37,586 /s  生产速度: 448,753 /s
  rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s
  出现问题:
  rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。
  rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”
  其他并无任何问题
  结论:
  很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。
  代码:
kafka:
package main
import (
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"log"
"time"
)
func main() {
go producer()
//    go consumer()
time.Sleep(10*time.Minute)
}
func producer()  {
config :=sarama.NewConfig()
config.Producer.Return.Successes = true
proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
if err != nil {
panic(err)
}
signals :=make(chan  os.Signal,1)
signal.Notify(signals,os.Interrupt)
var (
wg                          sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
go func() {
defer  wg.Done()
for _=range proder.Successes(){
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range proder.Errors(){
log.Println(err)
errors++
}
}()
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(enqueued)
}
}()
ProducerLoop:
for{
message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
select {
case proder.Input() <- message:
enqueued++
case <- signals:
proder.AsyncClose()
break ProducerLoop
}
}
wg.Wait()
log.Println("Successfully produced:%d;errors:%d\n",successes,errors)
}
func consumer()  {
coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
if err != nil {
panic(err)
}
defer func() {
if err :=coner.Close(); err !=nil{
log.Fatalln(err)
}
}()
partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close();err!=nil{
log.Fatalln(err)
}
}()
signals := make(chan os.Signal,1)
signal.Notify(signals,os.Interrupt)
consumed:=0
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(consumed)
}
}()
ConsumerLoop:
for{
select {
case _ = <-partitionConsumer.Messages():
consumed++
//            log.Println( string(msg.Value),"  =>  ",consumed)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
21/212>
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号