MetaQ集群安装测试

发表于:2014-4-22 11:21

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:liuzhoulong    来源:51Testing软件测试网采编

  5,应用例子
package com.test.metaq;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class AsyncConsum {
public static void main(String[] args) {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = "192.168.1.1:2181";
metaClientConfig.setZkConfig(zkConfig);
MessageSessionFactory sessionFactory = null;
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
final String topic = "test";
final String group = "meta-example";
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
try {
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}
public Executor getExecutor() {
return null;
}
});
consumer.completeSubscribe();
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package com.test.metaq;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class Products {
public static void main(String[] args) {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = "192.168.1.1:2181";
metaClientConfig.setZkConfig(zkConfig);
MessageSessionFactory sessionFactory = null;
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
}
MessageProducer producer = sessionFactory.createProducer();
final String topic = "test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(
System.in));
String line = "qiujinyong";
try {
while ((line = reader.readLine()) != null) {
SendResult sendResult = producer.sendMessage(new Message(topic,
line.getBytes()));
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:"
+ sendResult.getErrorMessage());
} else {
System.out.println("Send message successfully,sent to "
+ sendResult.getPartition());
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
  打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入message
  打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到message
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号