我在SimpleClusterListener的F#实现中收到错误(I receive errors with my F# implementation of SimpleClusterListene

编程入门 行业动态 更新时间:2024-10-16 20:19:02
我在SimpleClusterListener的F#实现中收到错误(I receive errors with my F# implementation of SimpleClusterListener)

我在SimpleClusterListener的F#实现上发现了以下错误:

[ERROR] [3/20/2017 11:32:53 AM] [Thread 0008] [[akka:// ClusterSystem / system / endpoin tManager / reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552 - 5 / endpointWriter#1522364225]]到达[akka.tcp:// ClusterSystem @ localhost]的非本地收件人[[akka.tcp:// ClusterSystem @ localhost:2552 /]]的丢弃消息[Akka.Actor.ActorSelectionMessage] :2552]入站地址[akka.tcp:// Clust erSystem@0.0.0.0:2552]

我运行了C#实现(在下面的附录中引用),没有任何问题。 另外,我使用的是与C#实现相同的端口。

注意:

我是Akka.Net的新手,因此,我正在努力解决我试图移植的示例出错的地方。

我的实现如下:

Main.fs

module Program open System open System.Configuration open Akka.Configuration.Hocon open Akka.Configuration open Akka.Actor open Samples.Cluster.Simple [<Literal>] let ExitWithSuccess = 0 let createActor port = let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection let config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port) .WithFallback(section.AkkaConfig) let system = ActorSystem.Create ("ClusterSystem", config) let actorRef = Props.Create(typeof<SimpleClusterListener>) system.ActorOf(actorRef, "clusterListener") |> ignore let startUp (ports:string list) = ports |> List.iter createActor [<EntryPoint>] let main args = startUp ["2551"; "2552"; "0"] Console.WriteLine("Press any key to exit") Console.ReadLine() |> ignore ExitWithSuccess

SimpleClusterListener.fs

namespace Samples.Cluster.Simple open Akka.Actor open Akka.Cluster open Akka.Event type SimpleClusterListener() = inherit UntypedActor() override this.PreStart() = let cluster = Cluster.Get(UntypedActor.Context.System) let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent> typeof<ClusterEvent.UnreachableMember> |] cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events) override this.OnReceive(message:obj) = let log = UntypedActor.Context.GetLogger() match message with | :? ClusterEvent.MemberUp as e -> log.Info("Member is up: {0}", e.Member) | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member) | :? ClusterEvent.MemberRemoved as e -> log.Info("Member is removed: {0}", e.Member) | _ -> () override this.PostStop() = let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System) cluster.Unsubscribe base.Self

上面的OnReceive方法永远不会被调用。 但是,PreStart方法可以。

附录:

如前所述,我在下面移植了C#实现。 我成功运行了这段代码。 因此,当我试图移植它时,我很困惑。

//----------------------------------------------------------------------- // <copyright file="Program.cs" company="Akka.NET Project"> // Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> // Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net> // </copyright> //----------------------------------------------------------------------- using Akka.Actor; using Akka.Configuration; using Akka.Configuration.Hocon; using System; using System.Configuration; namespace Samples.Cluster.Simple { class Program { static void Main(string[] args) { StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args); Console.WriteLine("Press any key to exit"); Console.ReadLine(); } public static void StartUp(string[] ports) { var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka"); foreach (var port in ports) { //Override the configuration of the port var config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port) .WithFallback(section.AkkaConfig); //create an Akka system var system = ActorSystem.Create("ClusterSystem", config); //create an actor that handles cluster domain events system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener"); } } } } //----------------------------------------------------------------------- // <copyright file="SimpleClusterListener.cs" company="Akka.NET Project"> // Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> // Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net> // </copyright> //----------------------------------------------------------------------- using Akka.Actor; using Akka.Cluster; using Akka.Event; namespace Samples.Cluster.Simple { public class SimpleClusterListener : UntypedActor { protected ILoggingAdapter Log = Context.GetLogger(); protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System); /// <summary> /// Need to subscribe to cluster changes /// </summary> protected override void PreStart() => Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) }); /// <summary> /// Re-subscribe on restart /// </summary> protected override void PostStop() => Cluster.Unsubscribe(Self); protected override void OnReceive(object message) { var up = message as ClusterEvent.MemberUp; if (up != null) { var mem = up; Log.Info("Member is Up: {0}", mem.Member); } else if (message is ClusterEvent.UnreachableMember) { var unreachable = (ClusterEvent.UnreachableMember)message; Log.Info("Member detected as unreachable: {0}", unreachable.Member); } else if (message is ClusterEvent.MemberRemoved) { var removed = (ClusterEvent.MemberRemoved)message; Log.Info("Member is Removed: {0}", removed.Member); } else if (message is ClusterEvent.IMemberEvent) { //IGNORE } else if (message is ClusterEvent.CurrentClusterState) { } else { Unhandled(message); } } } } <?xml version="1.0" encoding="utf-8"?> <configuration> <configSections> <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/> </configSections> <startup> <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/> </startup> <akka> <hocon> <![CDATA[ akka { actor { provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" } remote { log-remote-lifecycle-events = DEBUG dot-netty.tcp { hostname = "localhost" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@localhost:2551", "akka.tcp://ClusterSystem@localhost:2552"] #auto-down-unreachable-after = 30s } } ]]> </hocon> </akka> </configuration>

I have observed the following error on my F# implementation of SimpleClusterListener:

[ERROR][3/20/2017 11:32:53 AM][Thread 0008][[akka://ClusterSystem/system/endpoin tManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552- 5/endpointWriter#1522364225]] Dropping message [Akka.Actor.ActorSelectionMessage ] for non-local recipient [[akka.tcp://ClusterSystem@localhost:2552/]] arriving at [akka.tcp://ClusterSystem@localhost:2552] inbound addresses [akka.tcp://Clust erSystem@0.0.0.0:2552]

I ran the C# implementation (referenced in the Appendix below) with no issues. In addition, I am using the same ports that the C# implementation is using.

NOTE:

I'm new to Akka.Net and as a result, am struggling to troubleshoot where I went wrong with the example I attempted to port.

My implementation is as follows:

Main.fs

module Program open System open System.Configuration open Akka.Configuration.Hocon open Akka.Configuration open Akka.Actor open Samples.Cluster.Simple [<Literal>] let ExitWithSuccess = 0 let createActor port = let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection let config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port) .WithFallback(section.AkkaConfig) let system = ActorSystem.Create ("ClusterSystem", config) let actorRef = Props.Create(typeof<SimpleClusterListener>) system.ActorOf(actorRef, "clusterListener") |> ignore let startUp (ports:string list) = ports |> List.iter createActor [<EntryPoint>] let main args = startUp ["2551"; "2552"; "0"] Console.WriteLine("Press any key to exit") Console.ReadLine() |> ignore ExitWithSuccess

SimpleClusterListener.fs

namespace Samples.Cluster.Simple open Akka.Actor open Akka.Cluster open Akka.Event type SimpleClusterListener() = inherit UntypedActor() override this.PreStart() = let cluster = Cluster.Get(UntypedActor.Context.System) let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent> typeof<ClusterEvent.UnreachableMember> |] cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events) override this.OnReceive(message:obj) = let log = UntypedActor.Context.GetLogger() match message with | :? ClusterEvent.MemberUp as e -> log.Info("Member is up: {0}", e.Member) | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member) | :? ClusterEvent.MemberRemoved as e -> log.Info("Member is removed: {0}", e.Member) | _ -> () override this.PostStop() = let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System) cluster.Unsubscribe base.Self

The OnReceive method above never gets invoked. However, the PreStart method does.

Appendix:

As stated earlier, I ported the C# implementation below. I successfully ran this code. Thus, I am confused as to where I went wrong when I attempted to port it.

//----------------------------------------------------------------------- // <copyright file="Program.cs" company="Akka.NET Project"> // Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> // Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net> // </copyright> //----------------------------------------------------------------------- using Akka.Actor; using Akka.Configuration; using Akka.Configuration.Hocon; using System; using System.Configuration; namespace Samples.Cluster.Simple { class Program { static void Main(string[] args) { StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args); Console.WriteLine("Press any key to exit"); Console.ReadLine(); } public static void StartUp(string[] ports) { var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka"); foreach (var port in ports) { //Override the configuration of the port var config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port) .WithFallback(section.AkkaConfig); //create an Akka system var system = ActorSystem.Create("ClusterSystem", config); //create an actor that handles cluster domain events system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener"); } } } } //----------------------------------------------------------------------- // <copyright file="SimpleClusterListener.cs" company="Akka.NET Project"> // Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> // Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net> // </copyright> //----------------------------------------------------------------------- using Akka.Actor; using Akka.Cluster; using Akka.Event; namespace Samples.Cluster.Simple { public class SimpleClusterListener : UntypedActor { protected ILoggingAdapter Log = Context.GetLogger(); protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System); /// <summary> /// Need to subscribe to cluster changes /// </summary> protected override void PreStart() => Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) }); /// <summary> /// Re-subscribe on restart /// </summary> protected override void PostStop() => Cluster.Unsubscribe(Self); protected override void OnReceive(object message) { var up = message as ClusterEvent.MemberUp; if (up != null) { var mem = up; Log.Info("Member is Up: {0}", mem.Member); } else if (message is ClusterEvent.UnreachableMember) { var unreachable = (ClusterEvent.UnreachableMember)message; Log.Info("Member detected as unreachable: {0}", unreachable.Member); } else if (message is ClusterEvent.MemberRemoved) { var removed = (ClusterEvent.MemberRemoved)message; Log.Info("Member is Removed: {0}", removed.Member); } else if (message is ClusterEvent.IMemberEvent) { //IGNORE } else if (message is ClusterEvent.CurrentClusterState) { } else { Unhandled(message); } } } } <?xml version="1.0" encoding="utf-8"?> <configuration> <configSections> <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/> </configSections> <startup> <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/> </startup> <akka> <hocon> <![CDATA[ akka { actor { provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" } remote { log-remote-lifecycle-events = DEBUG dot-netty.tcp { hostname = "localhost" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@localhost:2551", "akka.tcp://ClusterSystem@localhost:2552"] #auto-down-unreachable-after = 30s } } ]]> </hocon> </akka> </configuration>

最满意答案

假设您使用1.1.3包。 你应该用

ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + port) .WithFallback(section.AkkaConfig);`

而不是dot-netty运输。 我们尚未发布那个。 并且仅在开发分支中可用。

Assuming your using the 1.1.3 packages. You should use

ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + port) .WithFallback(section.AkkaConfig);`

And not the dot-netty transport. That one is not released by us yet. And is only available in the dev branch.

更多推荐

本文发布于:2023-07-04 17:08:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1026960.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:我在   错误   SimpleClusterListener   implementation   receive

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!