基于Logstash的Elasticsearch数据迁移方案

基于Logstash的Elasticsearch数据迁移方案

1、背景信息

基于Logstash的Elasticsearch数据迁移方案.jpg-100.2kB

2、数据迁移前置条件

基于logstash的增量方案,无法适用,因为增量的数据同步可能还得需要date的时间类型,但是目前趣街的关于时间的字段均是keyword类型,甚至可能有一些索引没有关于时间的字段。

  • 对内对客的所有索引新增时间戳字段
  • 使用ES的应用程序均需要改造
    • 后续在创建新索引的时候,自动添加时间戳字段,现有的索引,我们运维通过API进行新增
    • doc文档发生变动后,需要同步维护时间戳字段,将其更新为当前时间,用于logstash数据采集

3、环境与工具准备

3.1、阿里侧环境与实例准备

3.2、自建侧环境与实例准备

4、迁移索引元数据(设置和映射)

  • 在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。

4.1、创建indiceCreate.py迁移脚本

1
2
3
4
5
6
7
## 分片数默认和源集群索引保持一致,可根据生产实际节点数修改数值。
#number_of_shards = 6
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]

## 副本数默认为0,可根据生产实际节点数修改数值。
#number_of_replicas = 1
number_of_replicas = DEFAULT_REPLICAS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
vim indiceCreate.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 源集群host。
oldClusterHost = "192.168.36.59:9200"
## 源集群用户名,可为空。
oldClusterUserName = ""
## 源集群密码,可为空。
oldClusterPassword = ""
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "10.60.2.31:9200"
## 目标集群用户名。
newClusterUser = ""
## 目标集群密码。
newClusterPassword = ""
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print (index + " 原始settings如下:\n" + indexSettings)
settingsDict = json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数默认为0。
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print (index + " 原始mapping如下:\n" + indexMapping)
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
#indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
indexneed = ['traderecord']
#print(indexList)
#print(systemIndex)
for index in indexneed:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")

4.2、执行Python脚本,创建目标索引

1
/usr/bin/python indiceCreate.py

4.3、查看目标ES的索引是否创建

1
curl http://10.60.2.31:9200/traderecord/_mapping?pretty

提示:脚本简单改造,支持全量同步和指定索引同步

5、迁移全量数据

logstash会占用部分资源,建议在闲置资源侧上部署logstash服务;

5.1、logstash安装与配置

  • 下载logstash,需根据es版本下载对应的logstash
    1
    2
    3
    4
    5
    su - sspusr
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.tar.gz
    tar xzvf logstash-7.3.2.tar.gz
    cd /home/sspusr/logstash-7.3.2
    mkdir -pv {logs,data} config/conf.d

5.2、全量pipeline文件制作

  • vim /home/sspusr/logstash-7.3.2/config/conf.d/inyusyses_outalies.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    input{
    elasticsearch{
    # 源端ES地址。
    hosts => ["http://192.168.36.59:9200"]
    # 安全集群配置登录用户名密码。
    #user => "xxxxxx"
    #password => "xxxxxx"
    # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
    index => "traderecord_202203"
    # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
    #docinfo=>true
    #slices => 5
    #size => 5000
    # 定时任务,以下配置表示每分钟执行一次
    schedule => "* * * * *"
    scroll => "5m"
    docinfo=>true
    size => 5000
    }
    }

    filter {
    # 去掉一些Logstash自己加的字段。
    mutate {
    remove_field => ["@timestamp", "@version"]
    }
    }

    output{
    elasticsearch{
    # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
    hosts => ["http://10.60.2.31:9200"]
    # 安全集群配置登录用户名密码。
    #user => "elastic"
    #password => "xxxxxx"
    # 目标端索引名称,以下配置表示索引与源端保持一致。
    index => "%{[@metadata][_index]}"
    # 目标端索引type,以下配置表示索引类型与源端保持一致。
    document_type => "%{[@metadata][_type]}"
    # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
    document_id => "%{[@metadata][_id]}"
    ilm_enabled => false
    manage_template => false
    }
    }

5.3、修改logstash.yml配置文件

  • vim /home/sspusr/logstash-7.3.2/config/logstash.yml
    1
    2
    path.data: /home/sspusr/logstash-7.3.2/data
    path.logs: /home/sspusr/logstash-7.3.2/logs

5.4、启动logstash服务

1
2
3
4
#启动命令
nohup /home/sspusr/logstash-7.3.2/bin/logstash &
#日志目录
/home/sspusr/logstash-7.3.2/logs/*

5.5、查看数据迁移结果

6、迁移增量数据

注意:需增量迁移的索引,必须有时间戳字段,类型为date,且不支持删除数据同步;

6.1、增量pipeline文件制作

6.2、数据追加同步

6.3、启动logstash服务

6.4、查看数据迁移结果

7、验证数据迁移结果

8、其他

-------------本文结束感谢您的阅读-------------
LiGuanCheng wechat
如有问题,请与我微信交流或通过右下角“daovoice”与我联系~。
请我喝一杯咖啡~