隐藏

SolrCloud-如何在.NET程序中使用

发布:2022/6/21 20:36:42作者:管理员 来源:本站 浏览次数:572

原来我们在我们的项目里用的是根据数据库路由到不同的单机Solr服务器,但是这样的话,每次Solr配置的修改都要修改三台不通的服务器,而且一台服务器挂了,必定会影响一部分用户不能使用搜索功能,而且还会造成一定程度的丢数据,所以我们换一种方式。


   两种可选方案:


       主从模式

       SolrCloud


经过对比,决定用SolrCloud,SolrCloud的概念和优缺点,就不再赘述了,网上一搜一大堆,这里主要写一下在C#如何使用SolrCloud。


最早我们用 EasyNet.Solr 来进行Solr查询的,但是貌似EasyNet.Solr 没有对SolrCloud的查询做封装,所以就各种找资料,最后只能自己封装了。


首先SolrCloud是通过zookeeper来调度的,那么我们就要先去zookeeper上面去load可用的节点,并且对 zookeeper的 clusterstate.json 文件做心跳检测,如果clusterstate.json 里的内容有变化,则说明节点状态有变化,需要重新load文件里的内容进行解析,否则就在应用程序第一次调用的时候加载一次,缓存起来。


献上核心代码


要先通过NuGet安装Zookeeper Client


在EasyNet.Solr根目录定义接口ISolrCloudOperrations.cs


using System;  

using System.Collections.Generic;  

using System.Linq;  

using System.Text;


namespace EasyNet.Solr  

{

   public interface ISolrCloudOperrations

   {

       string GetSolrCloudServer(string collectionName, bool isWrite);

   }

}


在Impl文件夹里实现接口 SolrCloudOperations.cs


internal static class ZookeeperStatus  

   {

       //每隔两秒钟pingzookeeper服务器,并监听solr服务器状态,如果状态有改变,更新状态文件,请求连接列表

       private static void Ping(string zkHost) {

           //如果是第一次调用,则加载配置文件

           if (DataLength == 0)

           {

               foreach (var host in zkHost.Split(',').ToList())

               {

                   Start(host);

               }

               Task.Factory.StartNew(() =>

               {

                   while (true)

                   {

                       foreach (var host in zkHost.Split(',').ToList())

                       {

                           Task.Factory.StartNew(Start, host);

                       }

                       Thread.Sleep(2000);

                   }

               });

           }

       }

       private static object pingObj = new object();

       private static void Start(object zkHost) {

           var watcher = new Watcher();

           using (var zk = new ZooKeeper(zkHost.ToString(), new TimeSpan(0, 0, 0, 10000), watcher))

           {

               var dataChange = watcher.WaitUntilWatched();

               Org.Apache.Zookeeper.Data.Stat stat = null;

               try

               {

                   stat = zk.Exists("/clusterstate.json", false);

               }

               finally

               {

                   if (stat != null)

                   {

                       byte[] data = null;

                       lock (pingObj)

                       {

                           if (DataLength == 0 || DataLength != stat.DataLength)

                           {

                               data = zk.GetData("/clusterstate.json", false, stat);

                               DataLength = stat.DataLength;

                               SetShard(data);

                           }

                       }

                   }

               }

           }

       }


       private static int DataLength = 0;


       private static Dictionary<string, List<Shard>> shards = new Dictionary<string, System.Collections.Generic.List<Shard>>();

       private static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();

       private static void SetShard(byte[] data)

       {

           var str = System.Text.Encoding.UTF8.GetString(data);

           JObject jObj = JsonConvert.DeserializeObject<JObject>(str);

           List<Shard> shardList = new List<Shard>();

           try

           {

               rw.EnterWriteLock();

               foreach (var p in jObj)

               {

                   foreach (var item in p.Value["shards"])

                   {

                       var jItem = item.First as JObject;

                       if (jItem["state"].ToString() == "active")

                       {

                           foreach (var replica in jItem["replicas"])

                           {

                               var jReplica = replica.First as JObject;

                               if (jReplica["state"].ToString() == "active")

                               {

                                   Shard shard = new Shard() { BaseUrl = jReplica["base_url"].ToString() };

                                   if (jReplica["leader"] != null && "true" == jReplica["leader"].ToString())

                                   {

                                       shard.Leader = true;

                                   }

                                   shardList.Add(shard);

                               }

                           }

                       }

                   }

                   if (shards.ContainsKey(p.Key))

                   {

                       shards[p.Key] = shardList;

                   }

                   else

                   {

                       shards.Add(p.Key, shardList);

                   }

               }

           }

           finally

           {

               rw.ExitWriteLock();

           }

       }



       public static string ZookeeperHost = "127.0.0.1:2181";


       public static string GetCollection(string collectionName, bool isWrite)

       {

           ///第一次调用,则开始ping

           if (DataLength == 0)

           {

               Ping(ZookeeperHost);

           }

           IEnumerable<Shard> tempShardList = null;

           try

           {

               rw.EnterReadLock();

               var shardList = shards[collectionName];


               if (!isWrite)

               {

                   tempShardList = shardList.Where(s => s.Leader == false);

               }

               //如果从库挂了,那么只能从主库读取了

               if (tempShardList == null || tempShardList.Count() == 0)

                   tempShardList = shardList.Where(s => s.Leader == true);

           }

           finally

           {

               rw.ExitReadLock();

           }

           if (tempShardList == null) throw new Exception("no active shard");

           //随机取值

           int random = new Random().Next(tempShardList.Count() - 1);

           return tempShardList.ToList()[random].BaseUrl;

       }


   }

   internal class Shard

   {

       public string BaseUrl { get; set; }

       public bool Leader { get; set; }

   }

   internal enum Changed

   {

       None,

       Children,

       Data

   }

   internal class Watcher : IWatcher

   {

       private readonly ManualResetEventSlim _changed = new ManualResetEventSlim(false);

       private WatchedEvent _event;

       public Changed WaitUntilWatched()

       {

           _changed.Wait();

           if (_event == null) throw new ApplicationException("bad state");

           if (_event.State != KeeperState.SyncConnected)

               throw new ApplicationException("cannot connect");

           if (_event.Type == EventType.NodeChildrenChanged)

           {

               return Changed.Children;

           }

           if (_event.Type == EventType.NodeDataChanged)

           {

               return Changed.Data;

           }

           return Changed.None;

       }

       void IWatcher.Process(WatchedEvent @event)

       {

           _event = @event;

           _changed.Set();

       }

   }


这样我们如果是write操作,就会调用leader分片对应的节点进行写入,如果是查询操作,则尽量直接调用非leader来进行查询,如果非leader节点挂了,那么久从主节点进行查询。