ElasticSearch zero-dwontime reindex
将数据库中的所有数据重新导入 elasticsearch 比较慢,即使使用 bulk api 批量导入也可能要花上 5 - 10 分钟,所以我尝试使用 ES 的 alias 来尽可能减少停机的时间。
需要注意的是,我的实现主要针对数据量不是特别大,并且没有不间断的创建和更新操作的情况。更新很频繁这种情况我会在最后简单讨论一下。
本文基于 Ruby on Rails 开发环境,并使用 elasticsearch-rails
这个 gem。
简单情况下的 zero-downtime
通常情况下我们会生成和数据库表名相同的 ES 索引名,在这里我用商品 products
作为示例,ElasticSearch 的 alias 就是别名,使用它来实现不停机重建索引的步骤如下:
1. 使用当前时间创建一个新的索引,导入数据
@date = Time.now.strftime '%Y%m%d%H%M%S'
@alias_name = "#{Product.index_name}_#{@date}"
# 将数据导入 @alias_name 这个索引中
Product.import(force: true, index: @alias_name, type: Product.document_type)
这里生成的 @alias_name
格式是 products_20180711172532
这样的。
2. 将别名关联到刚才生成的索引上
这里还要注意,如果之前没有使用过 alias 别名,那么你的索引名就是 products
,应该将其删除。
# 如果存在名为 products 的索引,则将其删除
client.indices.delete index: Product.index_name rescue nil
# 将别名 products 关联到索引 products_20180711172532 上去
client.indices.put_alias index: @alias_name, name: Product.index_name
3. 将之前关联的旧索引删除
我们需要把 products 这个别名所关联的其他索引都删掉,不然搜索结果中会包含两个索引中的结果
aliases = client.indices.get_alias(index: 'products').keys
# 该方法对应 GET http://localhost:9200/products/_alias
#
# 返回的 aliases 格式如下
# {"products_20180711143627"=>{"aliases"=>{"products"=>{}}}, "products_0711_17_09_26"=>{"aliases"=>{"products"=>{}}}}
需要注意这里使用的是 get_alias
,Elasticsearch 在 6.0 版本有一些 break changes,可以在这里查看:break_changes_60,在 6.0 之前版本应该可以使用 get_aliases
方法(对应 GET products/_aliases
)。
好了,这样就完成了整个索引重建工作,把这些结合起来的代码看起来应该就是下面这样:
class Reindex
def initialize(klass_name)
raise 'unknow class name' unless klass_name
@klass = klass_name.constantize
@date = Time.now.strftime '%Y%m%d%H%M%S'
@alias_name = "#{@klass.index_name}_#{@date}"
end
def reindex
import
put_alias
delete_old_aliases
end
def put_alias
client.indices.delete index: @klass.index_name rescue nil
client.indices.put_alias index: @alias_name, name: @klass.index_name
end
def delete_old_aliases
aliases = client.indices.get_alias(index: @klass.index_name).keys
aliases.each do |alias_name|
unless alias_name == @alias_name
client.indices.delete(index: alias_name)
puts "Deleted #{alias_name}"
end
end
end
def import_documents
@klass.import(force: true, index: @alias_name, type: @klass.document_type)
end
private
def client
@client ||= @klass.__elasticsearch__.client
end
end
# 粗暴一点的使用方法就是直接 Reindex.new('Product').reindex
# 如果是线上环境还是要谨慎一些
增量式的数据
有时候我们的数据会不断增加或者频繁地更新(比如服务器 log),那么使用上面这种方法就不可行了,根据我 google 的一些方案来看,比较可行的是在 Redis 中设置一个锁,锁开启后就将后续的 ES 操作按照 bulk 更新的 json 格式存在 Redis 中,在旧数据迁移之后再按照存在 Redis 中 ES 指令进行批量更新,最后关闭这个锁。
然而如果是重新使用数据库数据进行导入的话,还是会存在不一致,除非这个锁把数据库的各种操作也给锁了,所以这样实现起来还是比较困难的,我目前也没有这样的需求场景,就略过了。