隐藏

C#程序,超大数据量,超高并发任务,应该如何处理?

发布:2023/3/19 23:56:28作者:管理员 来源:本站 浏览次数:365

用C#写一个程序,运行在一台10G内存的服务器上,

作用是24小时不停的 ping 10万个ip,以获取每个ip的主机是否正常连接,

每隔2秒钟这10万个ip就全部ping一次,

并记录返回的指定信息,部分信息需要存储在硬盘或者数据库上.

与此同时,返回的信息需要一些运算和加工,展现给使用者.


如果ip只有少量几个,而且不是不停的运行,那我可以轻松实现,

但是大量的数据和高并发性出现后,C#的程序是不是应该大换血了?

究竟应该如何处理代码?如何做这个程序?


新的测试结果表明,失败率高也是由System.Net.NetworkInformation.Ping或者windows操作系统造成的。至于为什么会这样,还不清楚。 测试还是以2秒为一次循环,超时设置为2秒,最大PING实例数量为256(本地测试中比较稳定的最大并发数,也许128能更稳定),IP数量为2^n。下面是本地测试结果: IP数量 <= 8192 ,完全正常,循环周期 < 1s。100KB/s <= 网络流量 300KB/s,CPU <= 50%。 IP数量 = 16384 ,基本正常,1s < 循环周期 < 2s。200KB/s <= 网络流量 600KB/s,70% <= CPU <= 90%。 我本地的带宽是4Mb/s = 512KB/s (上面的600KB/s是任务管理器中的6%),测试只能到这里了。有一点要注意,超时或者出错对于CPU占用率的增大影响严重。 基于以上测试,我认为只要带宽达到2MB/s,6个CPU核心就差不多可以达到楼主需求了(我的CPU是单核心Inter Celeron M CPU 520 1.6GHz)。当然重要的是,PING的失败率不能太高,否则4个CPU承受不了,而且循环周期会被延长。 下面是基于fastCSharp的测试程序,由于原来是异步操作,后来改为同步只是简单修改了一下,所以并不逻辑有些怪异。

        sealed class pingInterval : IDisposable
        {
            private sealed class pinger : IDisposable
            {
                private readonly pingInterval pingInterval;
                private fastCSharp.net.ping ping;
                private System.Net.IPEndPoint ip;
                public pinger(pingInterval pingInterval)
                {
                    this.pingInterval = pingInterval;
                    ping = new fastCSharp.net.ping(1000, pingInterval.timeoutMilliseconds, null);
                }
                internal void Next()
                {
                    fastCSharp.threading.task.TinyTask.Add(next);
                }
                private void next()
                {
                    for (ip = pingInterval.getIP(); ip != null; ip = pingInterval.getIP()) pingInterval.onCompleted(ip, ping.Ping(ip));
                    pingInterval.free(this);
                }
                public void Dispose()
                {
                    fastCSharp.net.ping ping = this.ping;
                    this.ping = null;
                    if (ping != null) ping.Dispose();
                }
            }
            private const int maxPingCount = 256;
            private readonly System.Net.IPEndPoint[] ips;
            private int ipIndex;
            private readonly object ipLock = new object();
            private readonly int intervalSeconds;
            private readonly int timeoutMilliseconds;
            private action<System.Net.IPEndPoint, bool> onCompleted;
            private pinger[] pings;
            private readonly pinger[] freePings;
            private readonly pinger[] nextPings;
            private int freePingIndex;
            private DateTime pingTime;
            private readonly System.Timers.Timer timer;
            private readonly object pingLock = new object();
            public pingInterval(System.Net.IPEndPoint[] ips, int intervalSeconds, int timeoutMilliseconds, action<System.Net.IPEndPoint, bool> onCompleted)
            {
                this.ips = ips;
                this.intervalSeconds = intervalSeconds;
                this.timeoutMilliseconds = timeoutMilliseconds;
                this.onCompleted = onCompleted;
                freePingIndex = (intervalSeconds * 1000) / (timeoutMilliseconds + 1000);
                if (freePingIndex <= 1) freePingIndex = ips.Length;
                else freePingIndex = ips.Length / freePingIndex + 1;
                if (freePingIndex > maxPingCount) freePingIndex = maxPingCount;
                pings = new pinger[freePingIndex];
                freePings = new pinger[freePingIndex];
                nextPings = new pinger[freePingIndex];
                for (int index = freePingIndex; index != 0; pings[index] = freePings[index] = new pinger(this)) --index;
                timer = new System.Timers.Timer(intervalSeconds * 1000);
                timer.AutoReset = false;
                timer.Elapsed += next;
                pingTime = DateTime.Now;
                next(null, null);
            }
            private void next(object sender, System.Timers.ElapsedEventArgs e)
            {
                while (wait())
                {
                    Console.WriteLine("Start " + DateTime.Now.toString());
                    System.Threading.Monitor.Enter(pingLock);
                    int count = freePingIndex;
                    try
                    {
                        Array.Copy(freePings, nextPings, freePingIndex);
                        freePingIndex = 0;
                    }
                    finally { System.Threading.Monitor.Exit(pingLock); }
                    while (count != 0) nextPings[--count].Next();
                    DateTime now = DateTime.Now;
                    if ((pingTime = pingTime.AddSeconds(this.intervalSeconds)) > now)
                    {
                        timer.Interval = (pingTime - now).TotalMilliseconds;
                        if (pings != null) timer.Start();
                        break;
                    }
                }
            }
            private bool wait()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        if (ipIndex != 0) System.Threading.Monitor.Wait(ipLock);
                        ipIndex = ips.Length;
                        return true;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return false;
            }
            private System.Net.IPEndPoint getIP()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (ipIndex != 0)
                    {
                        System.Net.IPEndPoint ip = ips[--ipIndex];
                        if (ipIndex == 0) System.Threading.Monitor.Pulse(ipLock);
                        return ip;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return null;
            }
            private void free(pinger ping)
            {
                System.Threading.Monitor.Enter(pingLock);
                try
                {
                    freePings[freePingIndex++] = ping;
                }
                finally { System.Threading.Monitor.Exit(pingLock); }
            }
            public void Dispose()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        timer.Stop();
                        timer.Elapsed -= next;
                        foreach (pinger ping in pings) ping.Dispose();
                        pings = null;
                    }
                    System.Threading.Monitor.Pulse(ipLock);
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
            }
        }
        static int pingCount;
        static int pingErrorCount;
        static int loopCount = -1;
        static System.Net.IPEndPoint[] ips;
        static int ipCount;
        static void onPing(System.Net.IPEndPoint ip, bool isPing)
        {
            if (System.Threading.Interlocked.Decrement(ref ipCount) == -1)
            {
                System.Threading.Interlocked.Add(ref ipCount, ips.Length);
                if (++loopCount != 0) Console.WriteLine(DateTime.Now.toString() + " LOOP[" + loopCount.toString() + "] Count[" + (pingCount / loopCount).toString() + "] ErrorCount[" + pingErrorCount.toString() + "]");
            }
            if (isPing) ++pingCount;
            else ++pingErrorCount;
            //Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + " " + ip.ToString() + " : " + (reply != null && reply.Status == System.Net.NetworkInformation.IPStatus.Success).ToString());
        }
        static void Main(string[] args)
        {
            ips = System.IO.File.ReadAllText(@"d:\ip.txt").Split(',').left(16384).getArray(value => new System.Net.IPEndPoint(System.Net.IPAddress.Parse(value), 0));
            using (pingInterval pingInterval = new pingInterval(ips, 2, 2000, onPing))
            {
                Console.ReadKey();
            }
            Console.WriteLine("End");
 
            Console.ReadKey();
        }

修改了一个关于Timer的BUG,增加了PING超时参数。

        sealed class pingInterval : IDisposable
        {
            private sealed class pinger : IDisposable
            {
                private readonly pingInterval pingInterval;
                private System.Net.NetworkInformation.Ping ping;
                private System.Net.IPAddress ip;
                public pinger(pingInterval pingInterval)
                {
                    this.pingInterval = pingInterval;
                    ping = new System.Net.NetworkInformation.Ping();
                    ping.PingCompleted += pingCompleted;
                }
                internal void Next()
                {
                    while (ping != null)
                    {
                        if ((ip = pingInterval.getIP()) == null)
                        {
                            pingInterval.free(this);
                            break;
                        }
                        else
                        {
                            try
                            {
                                ping.SendAsync(ip, pingInterval.timeoutMilliseconds, this);
                                break;
                            }
                            catch { pingInterval.onCompleted(ip, false); }
                        }
                    }
                }
                private void pingCompleted(object sender, System.Net.NetworkInformation.PingCompletedEventArgs e)
                {
                    pingInterval.onCompleted(ip, e.Error == null);
                    Next();
                }
                public void Dispose()
                {
                    if (ping != null)
                    {
                        ping.PingCompleted -= pingCompleted;
                        ping.Dispose();
                        ping = null;
                    }
                }
            }
            private const int maxPingCount = 1 << 15;
            private readonly System.Net.IPAddress[] ips;
            private int ipIndex;
            private readonly object ipLock = new object();
            private readonly int intervalSeconds;
            private readonly int timeoutMilliseconds;
            private action<System.Net.IPAddress, bool> onCompleted;
            private pinger[] pings;
            private readonly pinger[] freePings;
            private readonly pinger[] nextPings;
            private int freePingIndex;
            private DateTime pingTime;
            private readonly System.Timers.Timer timer;
            private readonly object pingLock = new object();
            public pingInterval(System.Net.IPAddress[] ips, int intervalSeconds, int timeoutMilliseconds, action<System.Net.IPAddress, bool> onCompleted)
            {
                this.ips = ips;
                this.intervalSeconds = intervalSeconds;
                this.timeoutMilliseconds = timeoutMilliseconds;
                this.onCompleted = onCompleted;
                freePingIndex = (intervalSeconds * 1000) / (timeoutMilliseconds + 100);
                if (freePingIndex <= 1) freePingIndex = ips.Length;
                else freePingIndex = ips.Length / freePingIndex + 1;
                if (freePingIndex > maxPingCount) freePingIndex = maxPingCount;
                pings = new pinger[freePingIndex];
                freePings = new pinger[freePingIndex];
                nextPings = new pinger[freePingIndex];
                for (int index = freePingIndex; index != 0; pings[index] = freePings[index] = new pinger(this)) --index;
                timer = new System.Timers.Timer(intervalSeconds * 1000);
                timer.AutoReset = false;
                timer.Elapsed += next;
                pingTime = DateTime.Now;
                next(null, null);
            }
            private void next(object sender, System.Timers.ElapsedEventArgs e)
            {
                while (wait())
                {
                    System.Threading.Monitor.Enter(pingLock);
                    int count = freePingIndex;
                    try
                    {
                        Array.Copy(freePings, nextPings, freePingIndex);
                        freePingIndex = 0;
                    }
                    finally { System.Threading.Monitor.Exit(pingLock); }
                    while (count != 0) nextPings[--count].Next();
                    DateTime now = DateTime.Now;
                    if ((pingTime = pingTime.AddSeconds(this.intervalSeconds)) > now)
                    {
                        timer.Interval = (pingTime - now).TotalMilliseconds;
                        if (pings != null) timer.Start();
                        break;
                    }
                }
            }
            private bool wait()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        if (ipIndex != 0) System.Threading.Monitor.Wait(ipLock);
                        ipIndex = ips.Length;
                        return true;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return false;
            }
            private System.Net.IPAddress getIP()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (ipIndex != 0)
                    {
                        System.Net.IPAddress ip = ips[--ipIndex];
                        if (ipIndex == 0) System.Threading.Monitor.Pulse(ipLock);
                        return ip;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return null;
            }
            private void free(pinger ping)
            {
                System.Threading.Monitor.Enter(pingLock);
                try
                {
                    freePings[freePingIndex++] = ping;
                }
                finally { System.Threading.Monitor.Exit(pingLock); }
            }
            public void Dispose()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        timer.Stop();
                        timer.Elapsed -= next;
                        foreach (pinger ping in pings) ping.Dispose();
                        pings = null;
                    }
                    System.Threading.Monitor.Pulse(ipLock);
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
            }
        }


没有测试过这么大量的IP,我只写过10来个IP的PING,把IP放在数组里。 ping一个远程IP(发送32字节的包)平均要3ms,1千个IP要3s,1万个ip要30s,理论是可行,如果包大些,就不够了。你用下面代码测试一下,ping一个IP要多少时间? 

  public string PingT(string ip)
        {
            System.Net.NetworkInformation.Ping ping = new System.Net.NetworkInformation.Ping();
            using (ping)
            {
                PingOptions options = new PingOptions
                {
                    DontFragment = true
                };
                string s = "Test Data!";
                byte[] bytes = Encoding.ASCII.GetBytes(s);
                int timeout = 1000;
                PingReply reply = ping.Send(ip, timeout, bytes, options);
                             
                if (reply.Status == IPStatus.Success)
                {
                    success = "1";
                    ipreply = reply.RoundtripTime.ToString()+"ms";
 
                }
                else
                {
                    success = "0";
                    ipreply = "网络异常!";
                }
               //ping.Dispose();
 
            }
            return (success + " " + ipreply );
        }

闲着没事,针对楼主需求写了一个PING处理类

        class pingInterval : IDisposable
        {
            private class pinger : IDisposable
            {
                private pingInterval pingInterval;
                private System.Net.NetworkInformation.Ping ping;
                private System.Net.IPAddress ip;
                public pinger(pingInterval pingInterval)
                {
                    this.pingInterval = pingInterval;
                    ping = new System.Net.NetworkInformation.Ping();
                    ping.PingCompleted += pingCompleted;
                }
                internal void Next()
                {
                    while (ping != null)
                    {
                        if ((ip = pingInterval.getIP()) == null)
                        {
                            pingInterval.free(this);
                            break;
                        }
                        else
                        {
                            try
                            {
                                ping.SendAsync(ip, 100, this);
                                break;
                            }
                            catch { pingInterval.onCompleted(ip, false); }
                        }
                    }
                }
                private void pingCompleted(object sender, System.Net.NetworkInformation.PingCompletedEventArgs e)
                {
                    pingInterval.onCompleted(ip, e.Error == null);
                    Next();
                }
                public void Dispose()
                {
                    if (ping != null)
                    {
                        ping.PingCompleted -= pingCompleted;
                        ping.Dispose();
                        ping = null;
                    }
                }
            }
            private readonly System.Net.IPAddress[] ips;
            private int ipIndex;
            private readonly object ipLock = new object();
            private readonly int intervalSeconds;
            private action<System.Net.IPAddress, bool> onCompleted;
            private pinger[] pings;
            private readonly pinger[] freePings;
            private readonly pinger[] nextPings;
            private int freePingIndex;
            private DateTime pingTime;
            private readonly System.Timers.Timer timer;
            private readonly object pingLock = new object();
            public pingInterval(System.Net.IPAddress[] ips, int intervalSeconds, action<System.Net.IPAddress, bool> onCompleted)
            {
                this.ips = ips;
                this.intervalSeconds = intervalSeconds;
                this.onCompleted = onCompleted;
                freePingIndex = ips.Length / (intervalSeconds * 5) + 1;
                pings = new pinger[freePingIndex];
                freePings = new pinger[freePingIndex];
                nextPings = new pinger[freePingIndex];
                for (int index = freePingIndex; index != 0; pings[index] = freePings[index] = new pinger(this)) --index;
                timer = new System.Timers.Timer(intervalSeconds * 1000);
                timer.Elapsed += next;
                pingTime = DateTime.Now;
                next(null, null);
            }
            private void next(object sender, System.Timers.ElapsedEventArgs e)
            {
                if (sender != null) timer.Stop();
                while (wait())
                {
                    System.Threading.Monitor.Enter(pingLock);
                    int count = freePingIndex;
                    try
                    {
                        Array.Copy(freePings, nextPings, freePingIndex);
                        freePingIndex = 0;
                    }
                    finally { System.Threading.Monitor.Exit(pingLock); }
                    while (count != 0) nextPings[--count].Next();
                    DateTime now = DateTime.Now;
                    if ((pingTime = pingTime.AddSeconds(this.intervalSeconds)) > now)
                    {
                        timer.Interval = (pingTime - now).TotalMilliseconds;
                        if (pings != null) timer.Start();
                        break;
                    }
                }
            }
            private bool wait()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        if (ipIndex != 0) System.Threading.Monitor.Wait(ipLock);
                        ipIndex = ips.Length;
                        return true;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return false;
            }
            private System.Net.IPAddress getIP()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (ipIndex != 0)
                    {
                        System.Net.IPAddress ip = ips[--ipIndex];
                        if (ipIndex == 0) System.Threading.Monitor.Pulse(ipLock);
                        return ip;
                    }
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
                return null;
            }
            private void free(pinger ping)
            {
                System.Threading.Monitor.Enter(pingLock);
                try
                {
                    freePings[freePingIndex++] = ping;
                }
                finally { System.Threading.Monitor.Exit(pingLock); }
            }
            public void Dispose()
            {
                System.Threading.Monitor.Enter(ipLock);
                try
                {
                    if (pings != null)
                    {
                        timer.Stop();
                        timer.Elapsed -= next;
                        foreach (pinger ping in pings) ping.Dispose();
                        pings = null;
                    }
                    System.Threading.Monitor.Pulse(ipLock);
                }
                finally { System.Threading.Monitor.Exit(ipLock); }
            }
        }

当然你也可以修改并且直接创建10W个PingIP对象,每个对象仅仅处理一个IP,那么应该复用这些PingIP对象。

        class PingIP : IDisposable
        {
            private System.Net.NetworkInformation.Ping ping;
            private func<System.Net.IPAddress> getIP;
            private action<System.Net.IPAddress, bool> onCompleted;
            private System.Net.IPAddress currentIP;
            PingIP(func<System.Net.IPAddress> getIP, action<System.Net.IPAddress, bool> onCompleted)
            {
                this.getIP = getIP;
                this.onCompleted = onCompleted;
                ping = new System.Net.NetworkInformation.Ping();
                ping.PingCompleted += pingCompleted;
                next();
            }
            private void pingCompleted(object sender, System.Net.NetworkInformation.PingCompletedEventArgs e)
            {
                this.onCompleted(currentIP, e.Error == null);
                next();
            }
            private void next()
            {
                while (ping != null)
                {
                    if ((currentIP = getIP()) == null) Dispose();
                    else
                    {
                        try
                        {
                            ping.SendAsync(currentIP, 1000, this);
                            break;
                        }
                        catch
                        {
                            this.onCompleted(currentIP, false);
                        }
                    }
                }
            }
            public void Dispose()
            {
                if (ping != null)
                {
                    ping.PingCompleted -= pingCompleted;
                    ping.Dispose();
                    ping = null;
                }
            }
        }