隐藏

C#多线程并发处理数据库数据,发送信号等待处理完统一插入

发布:2023/3/20 0:14:14作者:管理员 来源:本站 浏览次数:339

public class JPService

   {

       public JPService()

       {

           //设定最大的Net并发连接数

           System.Net.ServicePointManager.DefaultConnectionLimit = 500;

           ThreadPool.SetMinThreads(15 , 2);//设置最小的工作线程数和IO线程数

           Config.Load();

       }

       private int MaxThread = 10;//最大的纠偏线程数


       private static Random m_rand = new Random(Guid.NewGuid().GetHashCode());//创建随机数

       private static object m_objLock = new object();//添加列表锁定


       private AutoResetEvent[] InitAutoResetEvent()

       {

           //对每个线程提供一个完成信号,初始化为未终止状态

          AutoResetEvent[] autoEvents = new AutoResetEvent[MaxThread];

           for (int i = 0; i < MaxThread; i++)

               autoEvents[i] = new AutoResetEvent(false);

           return autoEvents;

       }

       /// <summary>

       /// 将处理完的信息添加至List列表

       /// </summary>

       /// <param name="lst"></param>

       /// <param name="info"></param>

       public void AddItemToList(List<EntityJPGpsInfo> lst , EntityJPGpsInfo info)

       {

           lock (m_objLock)

           {

               lst.Add(info);

           }

       }


       public void OnStart()

       {          


             try

           {


                //在线程池中引入可执行的方法,一个循环不断判断


               ThreadPool.QueueUserWorkItem(_ =>

                   {


                      //不断循环,直到当前有数据可以纠偏,则将该方法加入线程池


                       while (true)

                       {


                          AutoResetEvent[] autoEvents = InitAutoResetEvent();

                          List<EntityJPGpsInfo> lst = new List<EntityJPGpsInfo>();

                           //得到处理数据   集

                           DataSet ds = GetBeforeJP();

                           //验证数据是否存在(null)

                           if (!DataHelper.VerifyDataSet(ds))

                           {

                               Thread.Sleep(100);

                               continue;  //如果检测DataSet可行,则继续

                           }


                           int iCount = ds.Tables[0].Rows.Count; //获取要处理的数据行数

                           int iBlock = (int)Math.Ceiling((double)iCount / MaxThread);//根据行数,线程数设定每个线程要处理的数据量

                           for (int i = 0; i < MaxThread; i++) //像线程池加入执行队列

                           {

                               ThreadPool.QueueUserWorkItem(data =>

                                   {

                                       int iIndex = (int)data;//线程序号0->MaxThread

                                       DataRowCollection rows = ds.Tables[0].Rows;

                                       for (int j = 0; j < iBlock; j++)

                                       {

                                           try

                                           {

                                               if (iIndex * iBlock + j >= iCount)//避免最后一个线程索引越界

                                                   break;

                                               int iSequence = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "SEQUENCE");

                                               int iMCUID = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "MCUID");

                                               int iLng = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LONGITUDE") / 36;

                                               int iLat = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LATITUDE") / 36;

                                               DateTime dtmPostionTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "POSITIONTIME");

                                               DateTime dtmReceiveTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "RECEIVETIME");

                                               try

                                               {

                                                   //请求指定网页地址进行处理返回结果

                                                   WebRequest jpRequest = WebRequest.Create(Config.BMSSettings.JPUrl + iLng.ToString() + "," + iLat.ToString() + "&t=" + m_rand.Next());

                                                   jpRequest.Method = "get";

                                                   jpRequest.ContentType = "application/x-www-form-urlencoded";

                                                   jpRequest.Timeout = 8000;//设置超时

                                                   WebResponse jpResponse = jpRequest.GetResponse();


                                                   string strJP;

                                                   using (StreamReader sr = new StreamReader(jpResponse.GetResponseStream()))

                                                   {

                                                       strJP = sr.ReadToEnd();

                                                       if (!string.IsNullOrEmpty(strJP))

                                                       {

                                                           strJP = strJP.Remove(strJP.Length - 1 , 1);

                                                           string[] strs = strJP.Split(new char[] { ',' });

                                                           //处理完加入列表

                                                           AddItemToList(lst , new EntityJPGpsInfo(iSequence , iMCUID , Convert.ToInt32(strs[0]) * 36 , Convert.ToInt32(strs[1]) * 36 , dtmPostionTime , dtmReceiveTime));

                                                       }

                                                   }

                                                   jpResponse.Close();

                                               }

                                               catch (Exception ex)

                                               {

                                                   LogHelper.Writeln("Error1:" + ex.Message);

                                               }

                                           }

                                           catch (Exception e)

                                           {

                                               LogHelper.Writeln("Error2:" + e.StackTrace);

                                           }

                                       }

                                       autoEvents[iIndex].Set();

                                   } , i);

                           }

                           //等待收到所有的信号

                           ManualResetEvent.WaitAll(autoEvents);

                           InsertToDB(lst); //处理完集体插入到数据库

                       }

                   });

           }

           catch (Exception exc)

           {

               LogHelper.Writeln("Error3:" + exc.StackTrace);

           }

       }

       /// <summary>

       /// 数据库提取预处理数据

       /// </summary>

       /// <returns>预处理数据集</returns>

       public DataSet GetBeforeJP()

       {

           OracleParameter[] paras = new OracleParameter[]

           {

               OracleHelper.MakeOutParam("curCursor",OracleType.Cursor)

           };

           return OracleHelper.ExecuteDataSet(Config.BMSSettings.ConnectionString , "SP_GIS_GET_BEFOREJP" , paras);

       }

       /// <summary>

       /// 全部循环插入到数据库

       /// </summary>

       /// <param name="lst">插入数据列表</param>

       public void InsertToDB(List<EntityJPGpsInfo> lst)

       {

           using (OracleConnection conn = OracleHelper.GetConnection(Config.BMSSettings.ConnectionString))

           {

               conn.Open();

               OracleCommand command = conn.CreateCommand();

               OracleTransaction trans = conn.BeginTransaction();

               try

               {

                   command.Transaction = trans;

                   command.Parameters.AddRange(new OracleParameter[]

                   {

                       new OracleParameter("iSequence",OracleType.Number),

                       new OracleParameter("iMCUID",OracleType.Number),

                       new OracleParameter("iLongitude",OracleType.Number),

                       new OracleParameter("iLatitude",OracleType.Number),

                       new OracleParameter("dtmPositionTime",OracleType.DateTime),

                       new OracleParameter("dtmReceiveTime",OracleType.DateTime)

                   });


                   foreach (EntityJPGpsInfo info in lst)

                   {

                       command.Parameters[0].Value = info.Sequence;

                       command.Parameters[1].Value = info.MCUID;

                       command.Parameters[2].Value = info.Longitude;

                       command.Parameters[3].Value = info.Latitude;

                       command.Parameters[4].Value = info.PositionTime;

                       command.Parameters[5].Value = info.ReceiveTime;

                       command.CommandType = CommandType.StoredProcedure;

                       command.CommandText = "SP_GIS_ADD_JP";

                       command.ExecuteNonQuery();

                   }

                   trans.Commit();

               }

               catch (Exception ex)

               {

                   trans.Rollback();

                   throw ex;

               }

           }

       }

}