大战熟女丰满人妻av-荡女精品导航-岛国aaaa级午夜福利片-岛国av动作片在线观看-岛国av无码免费无禁网站-岛国大片激情做爰视频

專注Java教育14年 全國咨詢/投訴熱線:400-8080-105
動力節點LOGO圖
始于2009,口口相傳的Java黃埔軍校
首頁 學習攻略 Java學習 Akka Java集群示例

Akka Java集群示例

更新時間:2022-10-18 11:14:27 來源:動力節點 瀏覽1887次

介紹

這是一個 Java、Maven、Akka 項目,演示了如何設置一個基本的 Akka 集群。

這個項目是一系列項目中的一個,它從一個簡單的 Akka Cluster 項目開始,逐步構建到事件溯源和命令查詢職責分離的示例。

該項目系列由以下 GitHub 存儲庫組成:

akka-java-cluster (這個項目)

akka-java-集群感知

akka-java-cluster-singleton

akka-java-集群分片

akka-java-cluster-persistence

akka-java-cluster-persistence-query

每個項目都可以獨立于其他項目進行克隆、構建和運行。

關于 Akka 聚類

根據Akka 文檔,“ Akka Cluster 提供了一種容錯分散的基于點對點的集群成員服務,沒有單點故障或單點瓶頸。它使用 gossip 協議和自動故障檢測器來做到這一點。

Akka 集群允許構建分布式應用程序,其中一個應用程序或服務跨越多個節點。"

Akka 文檔中的上述段落包含了許多最初可能難以理解的概念??紤]一些僅在兩句話中就被拋棄的術語,例如“容錯”、“去中心化”、“點對點”和“無單點故障”。最后一句幾乎是隨便說的“一個應用程序或服務跨越多個節點”。等待; 什么?應用程序或服務如何跨越多個節點?

答案是 Akka 提供了一個抽象層,該層由參與者在參與者系統中相互交互組成。Akka 是一個演員模型的實現。演員模型” (維基百科)將“演員”視為并發計算的通用原語。為了響應它接收到的消息,演員可以:做出本地決策,創建更多演員,發送更多消息,并確定如何響應收到的下一條消息。參與者可以修改自己的私有狀態,但只能通過消息相互影響(避免需要任何鎖)。

Akka Actor 通過異步消息相互通信。Akka Actor 系統在 Java 虛擬機上運行,??并且使用 Akka 集群,單個 Actor 系統可以在邏輯上跨越多個網絡 JVM。這個網絡化的參與者系統抽象層使參與者可以跨節點集群透明地與每個參與者進行通信。一種思考方式是,從演員的角度來看,他們生活在一個演員系統中,演員系統在一個或多個節點上運行的事實在很大程度上隱藏在抽象層中。

ClusterListenerActor 演員

Akka Actor 是用 Java 或 Scala 實現的。您可以將參與者創建為 Java 或 Scala 類。有兩種實現actor的方法,無類型和有類型。這個 Akka Java 集群示例項目系列中使用了無類型的 actor。

對于那些有興趣深入了解 Actor 如何工作以及如何實現的細節的人來說,關于Actors的 Akka 文檔部分 是一個很好的起點。

我們將要查看的第一個 Actor 名為 ClusterListenerActor。該參與者設置為接收有關集群事件的消息。當節點加入和離開集群時,此參與者會收到有關這些事件的消息。然后將這些接收到的消息寫入記錄器。

ClusterListenerActor 提供了集群活動的簡單視圖。以下是日志輸出的示例:

03:20:29.569 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://[email protected]:2551/user/clusterListener - MemberUp(Member(address = akka.tcp://[email protected]:2553, status = Up)) sent to Member(address = akka.tcp://[email protected]:2551, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://[email protected]:2551/user/clusterListener - 1 (LEADER) (OLDEST) Member(address = akka.tcp://[email protected]:2551, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://[email protected]:2551/user/clusterListener - 2 Member(address = akka.tcp://[email protected]:2552, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://[email protected]:2551/user/clusterListener - 3 Member(address = akka.tcp://[email protected]:2553, status = Joining)

讓我們從完整的 ClusterListenerActor 源文件開始。請注意,此 actor 是作為擴展基于 Akka 的類的單個 Java 類實現的。

package cluster;
import akka.actor.AbstractLoggingActor;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
class ClusterListenerActor extends AbstractLoggingActor {
    private final Cluster cluster = Cluster.get(context().system());
    private Cancellable showClusterStateCancelable;
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(ShowClusterState.class, this::showClusterState)
                .matchAny(this::logClusterEvent)
                .build();
    }
    private void showClusterState(ShowClusterState showClusterState) {
        log().info("{} sent to {}", showClusterState, cluster.selfMember());
        logClusterMembers(cluster.state());
        showClusterStateCancelable = null;
    }
    private void logClusterEvent(Object clusterEventMessage) {
        log().info("{} sent to {}", clusterEventMessage, cluster.selfMember());
        logClusterMembers();
    }
    @Override
    public void preStart() {
        log().debug("Start");
        cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
                ClusterEvent.ClusterDomainEvent.class);
    }
    @Override
    public void postStop() {
        log().debug("Stop");
        cluster.unsubscribe(self());
    }
    static Props props() {
        return Props.create(ClusterListenerActor.class);
    }
    private void logClusterMembers() {
        logClusterMembers(cluster.state());
        if (showClusterStateCancelable == null) {
            showClusterStateCancelable = context().system().scheduler().scheduleOnce(
                    Duration.ofSeconds(15),
                    self(),
                    new ShowClusterState(),
                    context().system().dispatcher(),
                    null);
        }
    }
    private void logClusterMembers(CurrentClusterState currentClusterState) {
        Optional<Member> old = StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
                .reduce((older, member) -> older.isOlderThan(member) ? older : member);
        Member oldest = old.orElse(cluster.selfMember());
        StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
                .forEach(new Consumer<Member>() {
                    int m = 0;
                    @Override
                    public void accept(Member member) {
                        log().info("{} {}{}{}", ++m, leader(member), oldest(member), member);
                    }
                    private String leader(Member member) {
                        return member.address().equals(currentClusterState.getLeader()) ? "(LEADER) " : "";
                    }
                    private String oldest(Member member) {
                        return oldest.equals(member) ? "(OLDEST) " : "";
                    }
                });
    }
    private static class ShowClusterState {
        @Override
        public String toString() {
            return ShowClusterState.class.getSimpleName();
        }
    }
}

這個類是一個簡單的actor實現的例子。然而,這個actor的獨特之處在于它訂閱了Akka系統來接收集群事件消息。

@Override
public void preStart() {
    log().debug("Start");
    cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
            ClusterEvent.ClusterDomainEvent.class);
}

Actor 被設置為接收集群事件消息。當這些消息到達時,參與者調用編寫的方法來記錄事件并記錄集群的當前狀態。

@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(ShowClusterState.class, this::showClusterState)
            .matchAny(this::logClusterEvent)
            .build();
}

隨著集群中的每個節點啟動,ClusterListenerActor 的一個實例也會啟動。然后,參與者記錄每個節點中發生的集群事件。您可以再次從每個節點的角度檢查來自每個集群節點的日志,以查看集群事件并查看集群節點的狀態。

這個怎么運作

在這個項目中,我們將從一個基于 Akka、Java 和 Maven 的示例的基本模板開始,其中包含運行 Akka 集群的代碼和配置。Maven POM 文件使用兩個插件,一個用于使用mvn:exec命令運行代碼,另一個插件構建一個自包含 JAR 文件用于使用java -jar命令運行代碼。

當項目代碼被執行時,動作在Runner類main方法中開始。

public static void main(String[] args) {
    if (args.length == 0) {
        startupClusterNodes(Arrays.asList("2551", "2552", "0"));
    } else {
        startupClusterNodes(Arrays.asList(args));
    }
}

該main方法調用startupClusterNodes傳遞給它一個端口列表的方法。如果未提供參數,則使用默認的三個端口集。

private static void startupClusterNodes(List<String> ports) {
    System.out.printf("Start cluster on port(s) %s%n", ports);
    ports.forEach(port -> {
        ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));
        AkkaManagement.get(actorSystem).start();
        actorSystem.actorOf(ClusterListenerActor.props(), "clusterListener");
        addCoordinatedShutdownTask(actorSystem, CoordinatedShutdown.PhaseClusterShutdown());
        actorSystem.log().info("Akka node {}", actorSystem.provider().getDefaultAddress());
    });
}

這些startupClusterNodes方法循環通過端口列表。為每個端口創建一個參與者系統。

ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));

創建一個演員系統時會發生很多事情。許多決定如何運行actor系統的細節是通過配置設置定義的。這個項目包括一個application.conf配置文件,它位于src/main/resources目錄中。最關鍵的配置設置之一定義了參與者系統主機和端口。當參與者系統在集群中運行時,配置還定義了每個節點將如何定位和加入集群。在這個項目中,節點使用所謂的種子節點加入集群。

cluster {
  seed-nodes = [
    "akka.tcp://[email protected]:2551",
    "akka.tcp://[email protected]:2552"]
}

讓我們來看看這個項目的集群啟動場景。在此示例中,一個 JVM 啟動時沒有運行時參數。當不帶參數調用Runner類main方法時,默認是在端口 2551、2552 和端口 0 上創建三個參與者系統(零端口會導致隨機選擇非零端口號)。

由于每個參與者系統都是在特定端口上創建的,因此它會查看種子節點配置設置。如果參與者系統的端口是種子節點之一,它知道它將與其他種子節點聯系以形成集群。如果參與者系統的端口不是種子節點之一,它將嘗試聯系種子節點之一。非種子節點需要向其中一個種子節點宣布自己并要求加入集群。

下面是使用默認端口 2551、2552 和 0 的示例啟動場景。在端口 2551 上創建了一個參與者系統;查看配置它知道它是一個種子節點。端口 2551 上的種子節點參與者系統嘗試聯系端口 2552 上的參與者系統,即另一個種子節點。當創建端口 2552 上的參與者系統時,它會經歷相同的過程,在這種情況下,2552 會嘗試與 2551 聯系并加入。當在隨機端口(例如端口 24242)上創建第三個參與者系統時,它會從配置中知道它不是種子節點,在這種情況下,它會嘗試與種子參與者系統之一進行通信,宣布自己并加入集群。

您可能已經注意到,在上面的示例中,三個參與者系統是在單個 JVM 中創建的。雖然每個 JVM 運行多個參與者系統是可以接受的,但更常見的情況是每個 JVM 運行一個參與者系統。

讓我們看一個稍微現實一點的例子。使用提供的akka腳本啟動一個三節點集群。

./akka cluster start 3

每個節點都在單獨的 JVM 中運行。在這里,我們有三個參與者系統,它們在三個 JVM 中獨立啟動。這三個參與者系統遵循與之前相同的啟動場景,結果它們形成了一個集群。

當然,最常見的場景是每個參與者系統都是在不同的 JVM 中創建的,每個 JVM 都運行在不同的服務器、虛擬服務器或容器上。同樣,相同的啟動過程發生在各個參與者系統通過網絡找到彼此并形成集群的地方??。

讓我們回到創建actor系統的那一行代碼。

ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));

從這個簡短的描述中,您可以看到在actor系統抽象層中發生了很多事情,而這個啟動過程的總結只是冰山一角,這是抽象層應該做的,它們隱藏了復雜性。

一旦多個參與者系統加入一個集群,從在這個虛擬參與者系統中運行的參與者的角度來看,它們會形成一個單一的虛擬參與者系統。當然,單個參與者實例物理上駐留在特定 JVM 內的特定集群節點中,但在接收和發送參與者消息時,節點邊界是透明的并且幾乎消失了。正是這種透明性是構建“一個應用程序或服務跨越多個節點”的基礎。

此外,通過添加更多節點來擴展集群的靈活性是消除單點瓶頸的機制。當集群中的現有節點無法處理當前負載時,可以添加更多節點來擴展容量。失敗也是如此。一個或多個節點的丟失并不意味著整個集群出現故障。可以替換故障節點,并且可以將在故障節點上運行的參與者重新定位到其他節點。

希望本概述能夠闡明 Akka 如何提供“無單點故障或單點瓶頸”以及“ Akka 集群如何允許構建分布式應用程序,其中一個應用程序或服務跨越多個節點。 ”

安裝

git clone https://github.com/mckeeh3/akka-java-cluster.git
cd akka-java-cluster
mvn clean package

Maven 命令構建項目并創建一個自包含的可運行 JAR。

運行集群(Mac、Linux)

該項目包含一組腳本,可用于啟動和停止單個集群節點或啟動和停止節點集群。

提供主腳本./akka以運行節點集群或啟動和停止單個節點。用于./akka node start [1-9] | stop啟動和停止單個節點以及./akka cluster start [1-9] | stop啟動和停止節點集群。clusterand start 選項將node在端口 2551 到 2559 上啟動 Akka 節點。stdinand輸出都使用文件命名約定stderr發送到目錄中的文件。/tmp/tmp/-N.log

在端口 2551 上啟動節點 1,在端口 2552 上啟動節點 2。

./akka node start 1
./akka node start 2

在端口 2553 上停止節點 3。

./akka node stop 3

在端口 2551、2552、2553 和 2554 上啟動一個由四個節點組成的集群。

./akka cluster start 4

停止所有當前正在運行的集群節點。

./akka cluster stop

您可以使用該./akka cluster start [1-9]腳本啟動多個節點,然后使用./akka node start [1-9]and./akka node stop [1-9] 啟動和停止單個節點。

使用該./akka node tail [1-9]命令tail -f創建節點 1 到 9 的日志文件。

該命令使用Akka 管理 擴展 Cluster Http Management./akka cluster status以 JSON 格式顯示當前正在運行的集群的狀態 。

運行集群(Windows,命令行)

以下 Maven 命令在端口 2551、2552 和 radmonly 選擇的端口上運行帶有 3 個 Akka 演員系統的signle JVM。

mvn exec:java

使用 CTRL-C 停止。

要在特定端口上運行,請使用以下-D選項傳入命令行參數。

mvn exec:java -Dexec.args="2551"

默認無參數等價于以下內容。

mvn exec:java -Dexec.args="2551 2552 0"

運行測試的一種常見方法是在多個命令窗口中啟動單個 JVM。這模擬了運行多節點 Akka 集群。例如,在 4 個命令窗口中運行以下 4 個命令。

mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log
mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log

這將運行一個 4 節點 Akka 集群,在端口 2551 和 2552 上啟動 2 個節點,這些節點是配置的集群種子節點和application.conf文件。以及隨機選擇的端口號上的 2 個節點??蛇x重定向> /tmp/$(basename $PWD)-4.log是基于項目目錄名稱將日志輸出推送到文件名的示例。

為方便起見,在 Linux 命令 shell 中定義以下別名。

alias p1='cd ~/akka-java/akka-java-cluster'
alias p2='cd ~/akka-java/akka-java-cluster-aware'
alias p3='cd ~/akka-java/akka-java-cluster-singleton'
alias p4='cd ~/akka-java/akka-java-cluster-sharding'
alias p5='cd ~/akka-java/akka-java-cluster-persistence'
alias p6='cd ~/akka-java/akka-java-cluster-persistence-query'
alias m1='clear ; mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log'
alias m2='clear ; mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log'
alias m3='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log'
alias m4='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log'

p1-6 別名命令是 cd 到六個項目目錄之一的快捷方式。m1-4 別名命令使用適當的端口啟動和 Akka 節點。Stdout 也被重定向到 /tmp 目錄。

提交申請后,顧問老師會電話與您溝通安排學習

免費課程推薦 >>
技術文檔推薦 >>
主站蜘蛛池模板: 欧美做爰xxxⅹ性欧 欧美做爰xxxⅹ在线视频hd | 色黄网站成年女人色毛片 | 黑人特黄aa毛片 | 欧美中文字幕在线视频 | 99影视| 久久综合给合久久狠狠狠97色 | 亚洲色啦啦狠狠网站 | 亚洲一区中文字幕在线 | 日韩毛片久久91 | 欧美成人精品高清在线观看 | 高清一级毛片一本到免费观看 | 欧美男人天堂网 | 91久久澡人人爽人人添 | 狠狠色噜噜狠狠狠狠网站视频 | 久久96国产精品久久久 | 91亚洲国产 | 国产精品久久久久久福利69堂 | 思99re久久这里只有精品首页 | 91破解版在线 | 亚洲 | 日本一区二区三区在线 观看网站 | 国产精品福利视频手机免费观看 | 久久亚洲国产 | 四虎+网站+影院+网站 | 深夜免费看 | 日本特级爽毛片叫声 | 久久美| 亚洲国产一区在线精选 | 中文字幕曰韩一区二区不卡 | 国产区在线观看视频 | 久久中文字幕在线 | 欧美精品免费看 | 超乳w真性中出し冲田杏梨101 | 久久大香伊蕉在人线观看热2 | 五月婷婷啪 | 日日夜夜天天干 | 天天谢天天干 | 久久一区二区免费播放 | 婷婷国产| 成人网18免费视频 | 嫩模被xxxx视频在线观看 | 亚洲国产精品免费在线观看 |