ElasticSearch-集群读写


  • ES跨集群搜索(CCS)
  • 分片的设计和管理
    • 算分不准 dfs_query_then_fetch
    • 如何设计分片数
  • ES底层读写工作原理
    • ES写入数据的过程
    • ES读取数据的过程
    • 写数据底层原理
  • 提升集群的读写性能
    • 提升集群读取性能
    • 提升集群写入性能

ES跨集群搜索(CCS)

  • ES水平扩展存在的问题
    • 单集群水平扩展时,节点数不能无限增加
      • 当集群的meta 信息(节点,索引,集群状态)过多会导致更新压力变大,单个Active Master会成为性能瓶颈,导致整个集群无法正常工作
    • 早期版本,通过Tribe Node可以实现多集群访问的需求,但是还存在一定的问题
      • Tribe Node会以Client Node的方式加入每个集群,集群中Master节点的任务变更需要Tribe Node 的回应才能继续
      • Tribe Node 不保存Cluster State信息,一旦重启,初始化很慢
      • 当多个集群存在索引重名的情况时,只能设置一种 Prefer 规则
    • Elasticsearch 5.3 引入了跨集群搜索的功能 (Cross Cluster Search),推荐使用
      • 允许任何节点扮演联合节点,以轻量的方式,将搜索请求进行代理
      • 不需要以Client Node的形式加入其他集群
# 启动3个集群
elasticsearch.bat ‐E node.name=cluster0node ‐E cluster.name=cluster0 ‐E path.data=cluster0_data ‐E discovery.type=single‐node ‐E http.port=9200 ‐E t 
ransport.port=9300
elasticsearch.bat ‐E node.name=cluster1node ‐E cluster.name=cluster1 ‐E path.data=cluster1_data ‐E discovery.type=single‐node ‐E http.port=9201 ‐E t 
ransport.port=9301
elasticsearch.bat ‐E node.name=cluster2node ‐E cluster.name=cluster2 ‐E path.data=cluster2_data ‐E discovery.type=single‐node ‐E http.port=9202 ‐E t 
ransport.port=9302
# 在每个集群上设置动态的设置
PUT _cluster/settings
{"persistent":{"cluster":{"remote":{
  "cluster0":{"seeds":["127.0.0.1:9300"],"transport.ping_schedule":"30s"},
  "cluster1":{"seeds":["127.0.0.1:9301"],"transport.compress":true,"skip_unavailable":true},
  "cluster2":{"seeds":["127.0.0.1:9302"]}}}}}
  • CCS的配置
    • seeds 配置的远程集群的 remote cluster 的一个 node
    • connected 如果至少有一个到远程集群的连接则为 true
    • num_nodes_connected 远程集群中连接节点的数量
    • max_connections_per_cluster 远程集群维护的最大连接数
    • transport.ping_schedule 设置了tcp层面的活性监听
    • skip_unavailable 默认是false,当对应的remote cluster不可用的话,则会报错
      • 设置为true的话,当这个remote cluster不可用的时候,就会忽略
    • cluster.remote.connections_per_cluster gateway nodes数量,默认是3
    • cluster.remote.initial_connect_timeout 节点启动时等待远程节点的超时时间,默认是30s
    • cluster.remote.node.attr 一个节点属性,用于过滤remote cluster中符合gateway nodes的节点
      • 比如设置cluster.remote.node.attr=gateway,那么将匹配节点属性 node.attr.gateway:true 的node才会被该node连接用来做CCS查询
    • cluster.remote.connect 默认情况下,群集中的任意节点都可以充当federated client并连接到remote cluster
      • cluster.remote.connect可以设置为 false(默认为true)以防止某些节点连接到 remote cluster
    • 在使用api进行动态设置的时候每次都要把seeds带上
# 查询结果获取到所有集群符合要求的数据
GET /users,cluster1:users,cluster2:users/_search 
{"query":{"range":{"age":{"gte":30,"lte":40}}}}

分片的设计和管理

  • 单个分片
    • 7.0开始,新创建一个索引时,默认只有一个主分片
      • 单个分片,查询算分,聚合不准的问题都可以得以避免
    • 单个索引,单个分片时候,集群无法实现水平扩展
  • 两个分片
    • 集群增加一个节点后,Elasticsearch 会自动进行分片的移动,也叫 Shard Rebalancing

算分不准 dfs_query_then_fetch

  • 算分不准的原因
    • 相关性算分在分片之间是相互独立的,每个分片都基于自己的分片上的数据进行相关度计算
    • 这会导致打分偏离的情况,特别是数据量很少时
      • 当文档总数很少的情况下,如果主分片大于1,主分片数越多,相关性算分会越不准
  • 解决算分不准的方法
    • 数据量不大的时候,可以将主分片数设置为1
    • 当数据量足够大时候,只要保证文档均匀分散在各个分片上,结果一般就不会出现偏差
    • 使用 DFS Query Then Fetch
      • 搜索的URL中指定参数_search?search_type=dfs_query_then_fetch
      • 到每个分片把各分片的词频和文档频率进行搜集,然后完整的进行一次相关性算分
        • 耗费更加多的CPU和内存,执行性能低下,—般不建议使用
GET /blogs/_search?search_type=dfs_query_then_fetch 
{"query":{"match":{"content":"elasticsearch"}}}

如何设计分片数

  • 当分片数>节点数时
    • 一旦集群中有新的数据节点加入,分片就可以自动进行分配
    • 分片在重新分配时,系统不会有 downtime
  • 多分片的好处:  一个索引如果分布在不同的节点,多个节点可以并行执行
    • 查询可以并行执行
    • 数据写入可以分散到多个机器
  • 分片过多所带来的副作用(Shard是Elasticsearch 实现集群水平扩展的最小单位)
    • 每个分片是一个Lucene的索引,会使用机器的资源。过多的分片会导致额外的性能开销
    • 每次搜索的请求,需要从每个分片上获取数据
    • 分片的Meta 信息由Master节点维护。过多,会增加管理的负担。经验值,控制分片总数在10W以内
  • 如何确定主分片数
    • 从存储的物理角度看
      • 搜索类应用,单个分片不要超过20 GB
      • 日志类应用,单个分片不要大于50 GB
  • 为什么要控制分片存储大小
    • 提高Update 的性能
    • 进行Merge 时,减少所需的资源
    • 丢失节点后,具备更快的恢复速度
    • 便于分片在集群内 Rebalancing
  • 如何确定副本分片数(副本是主分片的拷贝)
    • 提高系统可用性︰响应查询请求,防止数据丢失
    • 需要占用和主分片一样的资源
  • 对性能的影响
    • 副本会降低数据的索引速度: 有几份副本就会有几倍的CPU资源消耗在索引上
    • 会减缓对主分片的查询压力,但是会消耗同样的内存资源
      • 如果机器资源充分,提高副本数,可以提高整体的查询 QPS
  • ES的分片策略会尽量保证节点上的分片数大致相同,但是有些场景下会导致分配不均匀
    • 扩容的新节点没有数据,导致新索引集中在新的节点
    • 热点数据过于集中,可能会产生性能问题
  • 可以通过调整分片总数,避免分配不均衡
    • index.routing.allocation.total_shards_per_node
      • index级别的,表示这个index每个Node总共允许存在多少个shard,默认值是-1表示无穷多个
    • cluster.routing.allocation.total_shards_per_node
      • cluster级别,表示集群范围内每个Node允许存在有多少个shard。默认值是-1表示无穷多个
    • 如果目标Node的Shard数超过了配置的上限,则不允许分配Shard到该Node上
    • index级别的配置会覆盖cluster级别的配置

ES底层读写工作原理

  • 写请求是写入 primary shard,然后同步给所有的 replica shard
  • 读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法

ES写入数据的过程

  1. 客户端选择一个node发送请求过去,这个node就是coordinating node (协调节点)
  2. coordinating node,对document进行路由,将请求转发给对应的node
  3. node上的primary shard处理请求,然后将数据同步到replica node
  4. coordinating node如果发现primary node和所有的replica node都搞定之后,就会返回请求到客户端

ES读取数据的过程

  • 根据id查询数据的过程:根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询
    1. 客户端发送请求到任意一个 node,成为 coordinate node
    2. coordinate node 对 doc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 随机轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡
    3. 接收请求的 node 返回 document 给 coordinate node
    4. coordinate node 返回 document 给客户端
  • 根据关键词查询数据的过程
    1. 客户端发送请求到一个 coordinate node
    2. 协调节点将搜索请求转发到所有的 shard 对应的 primary shard 或 replica shard ,都可以
    3. query phase:每个 shard 将自己的搜索结果返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果
    4. fetch phase:接着由协调节点根据 doc id 去各个节点上拉取实际的 document 数据,最终返回给客户端

写数据底层原理

  • segment file: 存储倒排索引的文件,每个segment本质上就是一个倒排索引,每秒都会生成一个segment文件
    • 当文件过多时es会自动进行segment merge(合并文件),合并时会同时将已经标注删除的文档物理删除
  • commit point: 记录当前所有可用的segment,每个commit point都会维护一个.del文件
    • 每个.del文件都有一个commit point文件(es删除数据本质是不属于物理删除)
    • 当es做删改操作时首先会在.del文件中声明某个document已经被删除
      • 文件内记录了在某个segment内某个文档已经被删除
    • 当查询请求过来时在segment中被删除的文件是能够查出来的
    • 但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉
  • translog日志文件: 为了防止elasticsearch宕机造成数据丢失保证可靠存储
    • es会将每次写入数据同时写到translog日志中
  • os cache:操作系统里面,磁盘文件其实都有一个东西,叫做os cache,操作系统缓存
    • 数据写入磁盘文件之前,会先进入os cache,先进入操作系统级别的一个内存缓存中去
  • Refresh
    • 将文档先保存在Index buffer中,以refresh_interval为间隔时间,定期清空 buffer,生成 segment
    • 借助文件系统缓存的特性,先将segment放在文件系统缓存中,并开放查询,以提升搜索的实时性
  • Translog
    • Segment没有写入磁盘,即便发生了当机,重启后,数据也能恢复
    • 从ES6.0开始默认配置是每次请求都会落盘
  • Flush(ES自动完成)
    • 删除旧的translog 文件
    • 生成Segment并写入磁盘
    • 更新commit point并写入磁盘

提升集群的读写性能

提升集群读取性能

  • 数据建模
    • 尽量将数据先行计算,然后保存到Elasticsearch 中。尽量避免查询时的 Script 计算
    • 尽量使用Filter Context,利用缓存机制,减少不必要的算分
    • 结合profile,explain API分析慢查询的问题,持续优化数据模型
    • 避免使用*开头的通配符查询
# 查询时的 Script 计算
GET blogs/_search
{"query":{"bool":{"must":[{"match":{"title":"elasticsearch"}}],"filter":{
  "script":{"script":{"source":"doc['title.keyword'].value.length()>5"}}}}}}
  • 优化分片
    • 避免Over Sharing:一个查询需要访问每一个分片,分片过多,会导致不必要的查询开销
    • 结合应用场景,控制单个分片的大小
      • Search: 20GB
      • Logging: 40GB
    • Force-merge Read-only 索引
      • 使用基于时间序列的索引,将只读的索引进行 force merge,减少segment数量
# 手动 force merge
POST /my_index/_forcemerge

提升写入性能的方法

  • 写性能优化的目标:  增大写吞吐量,越高越好
  • 客户端: 多线程,批量写
    • 可以通过性能测试,确定最佳文档数量
    • 多线程: 需要观察是否有HTTP 429(Too Many Requests)返回,实现 Retry以及线程数量的自动调节
  • 服务器端: 单个性能问题,往往是多个因素造成的
    • 需要先分解问题,在单个节点上进行调整并且结合测试,尽可能压榨硬件资源,以达到最高吞吐量
    • 使用更好的硬件。观察CPU / IO Block
    • 线程切换│堆栈状况
  • 服务器端优化写入性能的一些手段
    • 降低IO操作
      • 使用ES自动生成的文档ld
      • 一些相关的ES 配置,如Refresh Interval
    • 降低 CPU 和存储开销
      • 减少不必要分词
      • 避免不需要旳doc_values
      • 文档的字段尽量保证相同的顺予,可以提高文档的压缩率
    • 尽可能做到写入和分片的均衡负载,实现水平扩展
      • Shard Filtering / Write Load Balancer
    • 调整Bulk 线程池和队列
  • 注意:ES 的默认设置,已经综合考虑了数据可靠性,搜索的实时性,写入速度,一般不要盲目修改。一切优化,都要基于高质量的数据建模
  • 建模时的优化
    • 只需要聚合不需要搜索,index设置成false
    • 不要对字符串使用默认的dynamic mapping。字段数量过多,会对性能产生比较大的影响
    • Index_options控制在创建倒排索引时,哪些内容会被添加到倒排索引中
  • 如果需要追求极致的写入速度,可以牺牲数据可靠性及搜索实时性以换取性能
    • 牺牲可靠性: 将副本分片设置为0,写入完毕再调整回去
    • 牺牲可靠性: 修改Translog的配置
    • 牺牲搜索实时性︰增加Refresh Interval的时间
  • 降低 Refresh的频率
    • 增加refresh_interval 的数值。默认为1s ,如果设置成-1,会禁止自动refresh
      • 避免过于频繁的refresh,而生成过多的segment 文件
      • 但是会降低搜索的实时性
    • 增大静态配置参数indices.memory.index_buffer_size
      • 默认是10%,会导致自动触发refresh
PUT /my_index/_settings
{"index":{"refresh_interval":"10s"}}
  • 降低Translog写磁盘的频率,但是会降低容灾能力
    • Index.translog.durability:  默认是request,每个请求都落盘。设置成async,异步写入
    • lndex.translog.sync_interval:设置为60s,每分钟执行一次
    • Index.translog.flush_threshod_size: 默认512 m,可以适当调大。当translog 超过该值,会触发flush
  • 分片设定
    • 副本在写入时设为0,完成后再增加
    • 合理设置主分片数,确保均匀分配在所有数据节点上
    • Index.routing.allocation.total_share_per_node: 限定每个索引在每个节点上可分配的主分片数
  • 调整Bulk 线程池和队列
    • 客户端
      • 单个bulk请求体的数据量不要太大,官方建议大约5-15m
      • 写入端的 bulk请求超时需要足够长,建议60s 以上
      • 写入端尽量将数据轮询打到不同节点
    • 服务器端
      • 索引创建属于计算密集型任务,应该使用固定大小的线程池来配置
        • 来不及处理的放入队列,线程数应该配置成CPU核心数+1,避免过多的上下文切换
        • 队列大小可以适当增加,不要过大,否则占用的内存会成为GC的负担
PUT myindex
{"settings":{"index":{
 "refresh_interval":"30s", # 30s 一次 refresh
   "number_of_shards":"2"},"routing":{"allocation":{
 "total_shards_per_node":"3" # 控制分片,避免数据热点
   }},"translog":{"sync_interval":"30s",
 "durability":"async" # 降低 translog 落盘频率
   },"number_of_replicas":0},"mappings": {
 "dynamic": false, # 避免不必要的字段索引,必要时可以通过 update by query 索引必要的字段
   "properties":{}}}

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录