Elasticserach数据迁移验证

Elasticserach数据迁移验证

1、前置条件梳理

SAG网络已提前打通,宇信金融云的ES机器可以与阿里的ES机器互访

1.1、本地ES信息整理

ES版本 测试环境IP 待同步索引 主分片 副本分片 存储大小
elasticsearch-7.3.2 192.168.36.59/60/61 traderecord 5 1 3.6gb
  • ES配置文件:elasticsearch.yml(node-1/2/3)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    network.host: 0.0.0.0
    http.port: 9200
    bootstrap.memory_lock: false
    bootstrap.system_call_filter: false
    path.data: /opt/test/data
    path.logs: /opt/test/logs
    node.max_local_storage_nodes: 1
    #discovery.zen.ping.unicast.hosts: ["127.0.0.1"]
    cluster.initial_master_nodes: ["node-1"]
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    cluster.name: yusys-test
    node.name: node-1
    node.master: true
    node.data: true
    discovery.zen.ping.unicast.hosts: ["192.168.36.59", "192.168.36.60", "192.168.36.61"]

1.2、阿里云ES信息整理

ES版本 测试环境IP 待同步索引 主分片 副本分片 存储大小
elasticsearch-7.3.2 10.60.2.31(外) traderecord 5 1 3.6gb

1.3、logstash的配置与使用

logstash会占用部分资源,建议在阿里云环境上部署logstash服务;

1.3.1、安装并创建所需目录

  • 下载logstash,需根据es版本下载对应的logstash
    1
    2
    3
    4
    5
    su - sspusr
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.tar.gz
    tar xzvf logstash-7.3.2.tar.gz
    cd /home/sspusr/logstash-7.3.2
    mkdir -pv {logs,data} config/conf.d

1.3.3、创建piplines文件

  • vim /home/sspusr/logstash-7.3.2/config/conf.d/inyusyses_outalies.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    input{
    elasticsearch{
    # 源端ES地址。
    hosts => ["http://192.168.36.59:9200"]
    # 安全集群配置登录用户名密码。
    #user => "xxxxxx"
    #password => "xxxxxx"
    # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
    index => "traderecord_202203"
    # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
    #docinfo=>true
    #slices => 5
    #size => 5000
    # 定时任务,以下配置表示每分钟执行一次
    schedule => "* * * * *"
    scroll => "5m"
    docinfo=>true
    size => 5000
    }
    }

    filter {
    # 去掉一些Logstash自己加的字段。
    mutate {
    remove_field => ["@timestamp", "@version"]
    }
    }

    output{
    elasticsearch{
    # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
    hosts => ["http://10.60.2.31:9200"]
    # 安全集群配置登录用户名密码。
    #user => "elastic"
    #password => "xxxxxx"
    # 目标端索引名称,以下配置表示索引与源端保持一致。
    index => "%{[@metadata][_index]}"
    # 目标端索引type,以下配置表示索引类型与源端保持一致。
    document_type => "%{[@metadata][_type]}"
    # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
    document_id => "%{[@metadata][_id]}"
    ilm_enabled => false
    manage_template => false
    }
    }

1.3.4、修改logstash.yml配置文件

  • vim /home/sspusr/logstash-7.3.2/config/logstash.yml
    1
    2
    path.data: /home/sspusr/logstash-7.3.2/data
    path.logs: /home/sspusr/logstash-7.3.2/logs

1.4、迁移索引元数据(设置和映射)

  • 在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。

1.4.1、创建indiceCreate.py迁移脚本

提示:脚本默认是将源ES的所有索引进行迁移,如迁移指定索引需改造脚本

1
2
3
4
5
6
7
## 分片数默认和源集群索引保持一致,可根据生产实际节点数修改数值。
#number_of_shards = 6
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]

## 副本数默认为0,可根据生产实际节点数修改数值。
#number_of_replicas = 1
number_of_replicas = DEFAULT_REPLICAS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
vim indiceCreate.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 源集群host。
oldClusterHost = "192.168.36.59:9200"
## 源集群用户名,可为空。
oldClusterUserName = ""
## 源集群密码,可为空。
oldClusterPassword = ""
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "10.60.2.31:9200"
## 目标集群用户名。
newClusterUser = ""
## 目标集群密码。
newClusterPassword = ""
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print (index + " 原始settings如下:\n" + indexSettings)
settingsDict = json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数默认为0。
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print (index + " 原始mapping如下:\n" + indexMapping)
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
#indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
indexneed = ['traderecord']
#print(indexList)
#print(systemIndex)
for index in indexneed:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")

1.4.2、执行Python脚本,创建目标索引

1
/usr/bin/python indiceCreate.py

1.4.3、查看目标ES的索引是否创建

1
curl http://10.60.2.31:9200/traderecord/_mapping?pretty

2、启动logstash服务

1
2
3
4
#启动命令
nohup /home/sspusr/logstash-7.3.2/bin/logstash &
#日志目录
/home/sspusr/logstash-7.3.2/logs/*

2.1、查看数据迁移结果

3、注意事项

  1. 趣街所使用的ES索引中,所有跟时间相关的字段,均为(type:text)类型,这就导致了阿里提供的logstash增量迁移方法,无法根据时间范围查询数据,需要有type:date的时间字段才可以进行范围定时抽取数据;
  2. 对logstash的全量pipline配置,进行调整,将增量中提到的定时任务添加到全量配置里,实现每分钟实时增量同步,logstash会记录checkpoint,不会出现重复数据;
  3. 索引的mapping及setting使用阿里的python脚本进行同步,目前已对脚本做了适配改造,支持同步指定索引
  4. 阿里官方迁移手册:通过Logstash将自建Elasticsearch数据全量或增量迁移至阿里云
-------------本文结束感谢您的阅读-------------
LiGuanCheng wechat
如有问题,请与我微信交流或通过右下角“daovoice”与我联系~。
请我喝一杯咖啡~