大战熟女丰满人妻av-荡女精品导航-岛国aaaa级午夜福利片-岛国av动作片在线观看-岛国av无码免费无禁网站-岛国大片激情做爰视频

JMS&ActiveMQ教程
基于JMS的消息傳送
ActiveMQ與Spring集成
ActiveMQ與SpringBoot集成
ActiveMQ安全機制
ActiveMQ主從集群

ActiveMQ主從集群

什么是集群

集群就是將相同的程序、功能,部署在兩臺或多臺服務(wù)器上,這些服務(wù)器對外提供的功能是完全一樣的。

集群是通過不斷橫向擴展增加服務(wù)器的方式,以提高服務(wù)的能力。

為什么需要集群

● 集群可以解決單點故障問題

● 集群可以提高系統(tǒng)的可用性

● 集群可以提高系統(tǒng)的服務(wù)能力

ActiveMQ主從集群方式

1、shared filesystem Master-Slave方式主從集群

通過共享存儲目錄(kahaDB)來實現(xiàn)master和slave的主從信息同步;

所有ActiveMQ的broker都在不斷地獲取共享目錄的控制權(quán),哪個broker搶到了控制權(quán),它就成為master,它將鎖定該目錄,其他broker就只能成為slave。

當(dāng)master主出現(xiàn)故障后,剩下的slave從將再進(jìn)行爭奪共享目錄的控制權(quán),誰搶到共享目錄的控制權(quán),誰就成為主,其他沒有搶到控制權(quán)的稱為從。

由于他們是基于共享目錄,所以當(dāng)主出現(xiàn)故障后,其上沒有被消費的消息在接下來產(chǎn)生的新的master主中可以繼續(xù)進(jìn)行消費。

這種方式客戶端訪問的都是主,從只是起到了一個備份訪問的作用

(1) 架構(gòu)圖

(2) 實現(xiàn)步驟

A、 安裝多個ActiveMQ

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來。

復(fù)制前,先將運行的ActiveMQ停止。

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作

C、 配置每個activeMQ的conf /activemq.xml文件中的共享目錄

如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了

如果搭建在多臺服務(wù)器上,那么存放共享目錄的機器需要通過磁盤掛載的方式掛載到主從機器上。

● 修改三個ActiveMQ的共享目錄

persistenceAdapter>
    <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>

● 修改完持久化目錄后,需要在/opt目錄下創(chuàng)建該目錄

D、 配置每個activeMQ的conf /activemq.xml文件中的端口

為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1,可以將文件下載下來替換。

一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大連接數(shù);

● wireFormat.maxFrameSize 表示一個完整消息的最大數(shù)據(jù)量,單位byte;

● 0.0.0.0表示任意ip

E、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺)

第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

F、 啟動三臺ActiveMQ,可以測試驗證了

注意:啟動后會有一段時間延時,稍等一會;

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;

web控制臺能訪問的是 master,不能訪問的是 slave。

G、 修改11-activemq-java中的程序收發(fā)消息代碼

連接時使用故障轉(zhuǎn)義協(xié)議failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

 為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式

//發(fā)消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message); 
}

查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果

注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯,不能實現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測試,如果是非事務(wù)消息就注釋掉session.commit。

2、shared database Master-Slave方式主從集群

該方式與shared filesystem方式類似,只是共享的存儲介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫。

(1) 架構(gòu)圖

(2) 實現(xiàn)步驟

A、 安裝多個ActiveMQ(已做)

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來復(fù)制前,先將運行的ActiveMQ停止。

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器是jdbc數(shù)據(jù)庫方式

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

D、 配置每個數(shù)據(jù)庫連接池

注意:連接池的配置需要配置在的外面

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
</bean>

E、 在每個ActiveMQ的lib目錄下加入mysql的驅(qū)動包和數(shù)據(jù)庫連接池Druid包,該包在我提供的資料05-ActiveMQ\resources\lib下

可以通過Xftp或者rz命令上傳。

F、 啟動MySQL數(shù)據(jù)庫,并創(chuàng)建activemq數(shù)據(jù)庫

G、 配置每個activeMQ的conf /activemq.xml文件中的端口(已做)

如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了;

為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1。

第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大連接數(shù);

● wireFormat.maxFrameSize 表示一個完整消息的最大數(shù)據(jù)量,單位byte;

● 0.0.0.0表示任意ip

H、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺) (已做)

第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

I、啟動三臺ActiveMQ,可以測試驗證了

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;

web控制臺能訪問的是 master,不能訪問的是 slave。

J、 修改11-activemq-java中的程序收發(fā)消息類(已做)

連接時使用故障轉(zhuǎn)義協(xié)議failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址 

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式

//發(fā)消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
    session.commit();
}

查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果

注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯,不能實現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測試,如果是非事務(wù)消息就注釋掉session.commit。

3、Replicated LevelDB Store方式主從集群(常用)

基于可復(fù)制的LevelDB存儲方式的集群;

這種集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper從一組broker中協(xié)調(diào)選擇一個broker作為master主,其他broker作為slave從的模式。所有slave從節(jié)點通過復(fù)制master主節(jié)點的消息來實現(xiàn)消息同步,當(dāng)主出現(xiàn)故障后,沒有被消費的消息在從服務(wù)器上也同步了一份,所以不會有消息的丟失。

LevelDB 是 Google開發(fā)的一套用于持久化數(shù)據(jù)的高性能kv數(shù)據(jù)庫,ActiveMQ利用該數(shù)據(jù)庫進(jìn)行數(shù)據(jù)的存儲。

只有master 接受客戶端連接,slave不接受客戶端連接,Master的所有存儲操作都將被復(fù)制到slaves。

在這個模式中,需要有半數(shù)以上的broker是正常的,集群才是可用的,超過半數(shù)broker故障,ZooKeeper的選舉算法將不能選擇master,從而導(dǎo)致集群不可用。

(1)架構(gòu)圖

(2) 實現(xiàn)步驟

A、 安裝多個ActiveMQ(已做)

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來。

復(fù)制前,先將運行的ActiveMQ停止

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器replicatedLevelDB方式

<persistenceAdapter>
<replicatedLevelDB
    replicas="3"
    bind="tcp://0.0.0.0:0"
    zkAddress="localhost:2181"/>
</persistenceAdapter>

參數(shù)說明

● replicas :集群中存在的節(jié)點的數(shù)目

● bind :當(dāng)該節(jié)點成為master后,將使用該bind配置的ip和端口進(jìn)行數(shù)據(jù)復(fù)制

● zkAddress :ZooKeeper的地址

D、 啟動ZooKeeper服務(wù)器

E、 啟動三臺ActiveMQ,可以測試驗證了

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;

web控制臺能訪問的是 master,不能訪問的是 slave。

F、 修改11-activemq-java中的程序收發(fā)消息類(已做)

連接時使用故障轉(zhuǎn)義協(xié)議failover

ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式 

//發(fā)消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
}

查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果。

G、 把其中的一臺master關(guān)閉,留下兩臺運行,觀察效果

H、 繼續(xù)關(guān)閉下一臺master,留下一臺運行,觀察效果

I、 啟動其中一臺,讓兩個運行,再觀察效果

(3) 總結(jié)

這種方式,不適合集群太大,也就是activemq不能太多,因為多個activemq之間需要復(fù)制消息,這個比較耗資源,占用網(wǎng)絡(luò),建議3、5臺。

全部教程
主站蜘蛛池模板: 精品国产综合成人亚洲区 | 亚洲伊人国产 | 国产好大好爽久久久久久久 | 天天做.天天爱.天天综合网 | 四虎免费看 | 黄色直接观看 | 偷偷鲁影院手机在线观看 | 国产欧美亚洲精品第一区 | 视频二区 素人 欧美 日韩 | 九九精品国产兔费观看久久 | 中国欧美一级毛片免费 | 伊人这里只有精品 | 天天干免费视频 | 欧美色视频在线观看 | 插插天天 | 99re这里只有精品99 | 97影院网 | 欧美中文字幕 | 不卡国产视频 | 国产精品青青青高清在线密亚 | 国产亚洲精品久久久久久小说 | 一级女性全黄生活片免费看 | 欧美成人毛片在线视频 | 第一福利在线 | 成人在线毛片 | 韩国三日本三级中文字幕 | 欧美一区永久视频免费观看 | 香蕉97超级碰碰碰碰碰久 | 亚洲国产成人私人影院 | 波多野结衣一区二区 | 婷婷四房综合激情五月性色 | 色综合久久久久综合99 | 四虎影永久在线高清免费 | 久久精品国产视频 | 久久精品国产精品青草图片 | 日本裤袜xxxx视频 | 57pao一国产成视频永久免费 | 免费看国产精品麻豆 | 亚洲va精品中文字幕 | 国产精品天天影视久久综合网 | 亚洲成在人网站天堂一区二区 |