产品介绍

如何使用

导航 > 初识产品

初识产品

 

该部分主要描述CEDA各个组件的安装、配置、使用以及代码示例等内容。

 

第一步:CEDA产品环境配置

第二步:导入Sample代码

第三步:初识CEDA Server/Client API

第四步:初识消息中间件AMQ

第五步:初始注册服务器

第六步:初识安全通讯服务器ACS

第七步:初识客户端框架

 

CEDA产品环境配置 操作系统要求

ATF客户端系统要求:Windows XP/Windows 7, .net framework 2.0

sp2/.net framework 3.5sp1

服务器端系统要求:Linux(CentOS5.4 或 RHEL5.4), Java 6

 

开发环境

Java:JDK 6, Eclipse 3.2

C++:VisualStudio 2008

C#:VisualStudio 2008

 

导入Sample代码 Java

从Eclipse导入工程:

 

选择next:

 

选择sample代码所在的文件夹:

 

点击finish, 完成代码导入

 

导入后的工程:

 

C#

安装了Visual Studio 2008后直接点击例子里面的*.csproj文件, 就可以打开sample代码.

 

初识CEDA Server/Client API 下载和安装

API集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面

初始化server

 

server需要实现接口IServerConnectionListener:

publicclass P2PPublisher implements IServerConnectionListener

设置IP地址和端口, 启动server:

// 设置server参数

ServiceInfo serviceInfo = new ServiceInfo();

serviceInfo.setHost(host);

serviceInfo.setPort(port);

// 启动server

serverHandler = ServiceManager.getInstance().startServer(

serviceInfo, this);

消息处理器:

class MessageHandler implements IConnectListener

client验证时创建对应的消息处理器:

publicboolean userValidation(UserInfo arg0, IServerConnection

connHandler) {

MessageHandler msgHandler = new MessageHandler();

connHandler.setListener(msgHandler);

returntrue;

}

publicboolean userValidation(ClientInfo arg0, IServerConnection

connHandler) {

MessageHandler msgHandler = new MessageHandler();

connHandler.setListener(msgHandler);

returntrue;

}

client连接到server

 

client需要实现2个接口:

  1. IEventListener: 监听与server的连接信息
  2. IMessageListener: 从server接收消息

publicclass P2PSubscriber implements IMessageListener, IEventListener

设置参数, 连接到server:

info.setUser("test", "test");ClientInfo info = new ClientInfo();

info.setUser("test", "test");

// 设置服务端的IP地址和端口号

info.setAddress("serverHost", "serverPort");

// 创建连接

IClientConnection conn = ClientConnectionFactory

.createConnection(info);

conn.addEventListener(this);

conn.start();

client订阅消息

// 订阅topic

IClientSession session = conn.createSession();

consumer = session.createConsumer(new Destination(topic));

consumer.addMessageListener(this);

server发布消息

在消息处理器里处理client的订阅与退订:

publicvoid onSubscribe(List<BaseDestination> topicList,

IServerConnection connHandler) {

serverHandler.subscribe(topicList, connHandler);

}

publicvoid onUnSubscribe(List topicList,

IServerConnection connHandler) {

serverHandler.unSubscribe(topicList, connHandler);

}

构建消息并发布:

// 构建消息

Message msg = new Message();

msg.setDestination(new Destination(topic));

msg.getMessageBody().addString((short) 1, content);

msg.getMessageBody().addBytes((short) 2, buf);

// 发送消息

serverHandler.sendMessage(msg);

client request

先订阅了请求消息的topic:

// 订阅topic

IClientSession session = conn.createSession();

MessageConsumer consumer = session.createConsumer(new

Destination(

topic));

consumer.addMessageListener(this);

然后发送请求消息:

// 发送request

MessageSender sender = session.createProducer();

requestCount++;

// 构造request消息

Message msgRequest = new Message();

msgRequest.setDestination(new Destination(topic));

msgRequest.setMessageID(requestCount);

msgRequest.setMessageBody(new MessageBody());

// 设置request参数

msgRequest.getMessageBody().addInt((short) 1, 1);

msgRequest.getMessageBody().addInt((short) 2, 2);

// 发送request消息

sender.send(msgRequest);

在onMessage接口里面获取reply消息:

publicvoid onMessage(Message message) {

if (message != null) {

try {

System.out.printf("Receive a message : id=%d,

result=%d",message.getMessageID(),

message.getMessageBody().getInt((short) 1));

} catch (MessageBodyException e) {

e.printStackTrace();

}

}

}

server reply

在MessageHandler的onMessage接口里面接收client request的消息, 然后生成reply 消息给client:

// 从request消息生成reply消息

Message msgReply = msgRequest.createReplyMessage();

// 获取请求参数

int arg1 = msgRequest.getMessageBody().getInt((short) 1);

int arg2 = msgRequest.getMessageBody().getInt((short) 2);

int result = arg1 + arg2;

// 设置结果

msgReply.getMessageBody().addInt((short) 1, result);

// 返回reply消息

connHandler.sendMessage(msgReply);

示例代码

完整示例代码在目录/example/java/01 cedaapi/下:

 client订阅消息

 ats/example/java/P2PSubscriber.java

 server发布消息

 ats/example/java/P2PPublisher.java

 client request

 ats/example/java/P2PRequestClient.java

 server reply

 ats/example/java/P2PReplyServer.java

 

初识消息中间件AMQ

 

下载和安装

下载和安装最新版本的AMQ,请见下载页面

连接AMQ

使用CEDA api连接到AMQ中间件:

ClientInfo info = new ClientInfo();

info.setUser("test", "test");

// 设置AMQ的IP地址和端口号

info.setAddress(host, port);

// 创建连接

IClientConnection conn = ClientConnectionFactory

.createConnection(info);

conn.addEventListener(this);

conn.start();

订阅消息

创建session和consumer, 订阅消息:

// 订阅topic,接收消息

IClientSession session = conn.createSession();

MessageConsumer consumer = session.createConsumer(new Destination(topic));

consumer.addMessageListener(this);

在onMessage接口处理接收到的消息:

publicvoid onMessage(Message message) {

if (message != null) {

try {

System.out.println("Receive a message : "

+ message.getMessageBody().getString((short) 1));

} catch (MessageBodyException e) {

e.printStackTrace();

}

}

}

发布消息

登录上AMQ后创建session和producer:

// 创建Session和Sender

session = conn.createSession();

sender = session.createProducer();

构建消息, 并发布到AMQ上:

// 构建消息

Message msg = new Message();

msg.setDestination(new Destination(topic));

msg.setMessageBody(new MessageBody());

msg.getMessageBody().addString((short) 1, content);

msg.getMessageBody().addBytes((short) 2, buf);

// 发送消息

sender.send(msg);

示例代码

完整示例代码在目录/example/java/ 02 amqclient/下:

 订阅消息

 ats/example/java/MQPublisher.java

 发布消息

 ats/example/java/MQSubscriber.java

 

初始注册服务器

 

下载和安装

注册服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面

连接注册服务器

为了连接到注册服务器, 需要实现以下2个接口:

  1. IEventListener: 应用和注册服务器的连接状态信息
  2. IClusterListener: 注册应用变动信息

初始化:

serviceInfo = new ServiceInfo();

serviceInfo.setHost("127.0.0.1");

// 使用register server balance模式

serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);

serviceInfo.setName(this.getClass().getSimpleName());

// 监听连接到register server的事件

ServiceManager.getInstance().addRegisterEventListener(this);

目前支持standby(主备机)和load balance(负载均衡)2种模式:

standby模式:

serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);

load balance模式:

serviceInfo.setType(ServiceInfo.SERVICE_TYPE_BLANCE);

创建集群:

// 创建集群

registerClient = ServiceManager.getInstance().createClustClient(

serviceInfo.getName());

// 监听集群事件

registerClient.addListener(this);

连接到注册服务器:

ServiceManager.getInstance().connectRegister(host + ":" + port);

注册服务

在IEventListener的onEvent方法里面监听连接注册服务器事件:

publicvoid onEvent(int nCode) {

switch (nCode) {

case IEventListener.CONNECTION_CONNECTED:

System.out.println("Service: register CONNECTION_CONNECTED");

// 连接后注册服务

registerService();

break;

case IEventListener.CONNECTION_CLOSED:

System.out.println("Service: register CONNECTION_CLOSED");

break;

}

}

连接上注册服务器后, 注册应用提供的服务:

registerClient.registService(serviceInfo);

启动服务 standby模式:

注册服务后, 从IClusterListener的onClustChange里监听所有服务的变动情况(新注册服务, 已注册的服务退出等)

服务变动时, 比较应用自己id与work service的id, 如果相同, 则启动服务:

// 获取work service列表

List infoList = registerClient.getWorkService();

for (ServiceInfo sinfo : infoList) {

// 如果是主server,则启动服务

if (sinfo.getSequenceName().equals(serviceInfo.getSequenceName())) {

// 启动服务

if (!isStart) {

System.out.println("start server: "

+ sinfo.getSequenceName());

isStart = true;

}

}

}

load balance模式:

balance模式在初始化后, 先启动服务, 然后再连接注册服务器注册, 不再需要监听onClustChange事件来启动服务.

示例代码

完整示例代码在目录/example/java/03 RegistryServer/下:

 standby服务器端

 ats/example/java/StandbyServer.java

 balance服务器端

 ats/example/java/BalanceServer.java

 standby客户端

 ats/example/java/StandbyClient.java

 balance客户端

 ats/example/java/BalanceClient.java

 

初识安全通讯服务器ACS

 

下载和安装

安全通讯服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面

ACS作为消息中间件

ACS可以作为消息中间件, 类似于AMQ功能. 消息订阅者和发布者需要以http方式连接上:

ClientInfo info = new ClientInfo();

info.setUser("test", "test");

// 设置ACS的url和端口号

info.setAddress(host, port);

// 设置连接协议,ACS使用http/https

info.setProtocol(ClientInfo.PROTOCOL_HTTP);

ACS的host是一个url.

ACS作为通讯中转服务器

后台服务注册到registry server, 配置ACS连接到相同的registry server, 并且该服务配置到ACS的服务列表里面:

[Register]

#registry server地址,多个地址用“,”分隔

Address=192.168.1.111:2182

 

#需要连接的Service

RequestService=ACSReplyServer,ACSPublishServer

client通过http方式连接上ACS server, 在建立连接的时候, 设置需要连接的后台服务:

// 创建连接

conn = ClientConnectionFactory.createConnection(info);

// 设置要连接的服务名称

conn.setMQServer(ACSReplyServer.class.getSimpleName());

conn.addEventListener(this);

conn.start();

以后该连接调用的服务就是后台对应的服务.

示例代码

完整示例代码在目录/example/java/ 04 ACS/下:

 作为消息中间件的订阅/发布例子

 ats/example/java/ACSPublisher.java

 ats/example/java/ACSSubscriber.java

 作为中转服务器的request/reply例子

 ats/example/java/ACSReplyServer.java

 ats/example/java/ACSRequestClient.java

 作为中转服务器的订阅/发布例子

 ats/example/java/ACSPublishServer.java

 ats/example/java/ACSSubscribeClient.java

 

初识客户端框架

 

下载和安装

客户端框架集成在CEDA的安装包中,下载请见CEDA安装包的下载页面, 客户端框架中CEDA安装包根目录下/atf文件夹下.

配置

需要根据服务器配置修改/atf/control/SampleLocalEnvSetting.xml中的服务器地址:

运行

双击/atf文件夹下的AtfLite.exe就可以打开框架:

编写插件(Plugin) SimplePlugin

 

插件开发
  1. Visual Studio新建一个Windows Forms的class library工程
  2. 引用必需的dll(可知/atf/bin下找到):

    Atf.Plugin.dll

    DevExpress.Data.v12.1.dll

    DevExpress.Utils.v12.1.dll

    DevExpress.XtraEditors.v12.1.dll

  3. 新建一个UserControl, 把继承的类由UserControl改成Atf.Plugin.Imp.IPluginDevImpl

 

  1. 布局界面, 界面逻辑编码等
    插件集成到客户端框架
  2. 在/atf/control/AppModules.xml里面增加一个plugin:
  3. 在/atf/control/AppModules.xml里面增加一个module:
  4. 运行框架, 就可以看到插件在左边侧栏里面的模块下面:
  5. 双击打开插件:

 

Market Data Client

 

这是一个初始化使用request/reply从后台获取数据, 后续通过订阅/发布消息实现更新的例子. 后台服务代码见目录/example/java/ 05MarketDataServer/.

  1. 界面布局:

    中间白色的地方是一个DataViewGrid

  2. 点击获取价格, 先创建消息处理器MessageHandler,订阅消息:

    if (msgHandler == null) {

    msgHandler = MQIIIManager.Instance.RegisterHandler(this.serverName, this.GetType().Name);

    msgHandler.MQMessage += newMessageHandler.OnMQMessageDelegate(msgHandler_MQMessage);

    }

    //订阅主题

    msgHandler.SubscribeTopics(newstring[] /p>

    { this.topic });

    然后启动线程, 从后台获取初始化数据:

    //获取初始化数据,需要在线程里进行

    ThreadPool.QueueUserWorkItem(newWaitCallback(RequestInitData));

    使用request/reply方式从后台获取初始化数据:

    //构建请求消息

    com.adaptiveMQ2.message.Message msg = new com.adaptiveMQ2.message.Message();

    msg.Destination = new com.adaptiveMQ2.message.BaseDestination(this.topic);

    msg.MessageBody = new com.adaptiveMQ2.message.MessageBody();

    msg.SvrID = this.serverName;

    //请求数据

    com.adaptiveMQ2.message.Message replyMsg = MQIIIManager.Instance.RequestMessage(this.serverName, msg);

    //处理返回的数据

    OnData(replyMsg);

  3. 订阅的实时消息到来时的处理:

    ///<summary>

    ///处理订阅到实时数据

    ///</summary>

    ///<param name="appName"></param>

    ///<param name="topic"></param>

    ///<param name="msg"></param>

    void msgHandler_MQMessage(string appName, string topic, com.adaptiveMQ2.message.Message msg) {

    //处理数据

    OnData(msg);

    }

沪公网安备 31011502002921号