Code
Problem: How to sync more than 10,000 records from Elasticsearch to mysql db
Solution:
1) Use the Scroll API to get all the record
2) keep appending them to csv file
3) Import the csv file in in mysql
Let's use the scroll api.
But before that we need a method to extract the data from ES response object for that,I used array_pluck.
function array_pluck($array, $key) {
return array_map(function($v) use ($key) {
return is_object($v) ? $v->$key : $v[$key];
}, $array);
}
And to convert the json to csv
I used this function jsonToCsv. Credits jakebathman
Below get 100 chunks and then appends it to csv file , and when all scrolling is done csv file is closed
private function fetch_the_es_data_with_scroll_dump_csv($index_name){
$params = array(
"scroll" => "5m",
"size" => 100,
'index' => $index_name,
"_source_excludes"=>["@version","@timestamp"]
);
$client=$this->elastic_obj;
$docs = $client->search($params);
$scroll_id = $docs['_scroll_id'];
// echo "<pre>";
// print_r($docs);
// echo "</pre>";
$tablename= $params['index'];
$dockeys=array_keys($docs['hits']['hits'][0]['_source']);
$keys = array_flip($dockeys);
$fieldsArray = array_fill_keys(array_keys($keys),"");
// echo $row;
$source_data = array_pluck( $docs['hits']['hits'], '_source' );
$this->csvFileObject="./ForSql_importCSV/".$tablename.".csv";
// $thecsv="./ForSql_importCSV/".$tablename.".csv";
jsonToCsv($source_data,$this->csvFileObject);
// echo $tablename;
// exit;
$fp = fopen($this->csvFileObject, 'a');
while (\true) {
$response = $client->scroll(
array(
"scroll_id" => $scroll_id,
"scroll" => "5m"
)
);
if (count($response['hits']['hits']) > 0) {
// echo "Do Work Here";
// echo "<pre>";
// print_r($response['hits']['hits']);
// echo "</pre>";
// echo "<br>";
// Get new scroll_id
// $source_datainscroll = array_pluck( $response['hits']['hits'], '_source' );
// echo "<pre>";
// print_r($source_datainscroll);
// echo "</pre>";
// $row = array_replace($fieldsArray,$source_datainscroll);
// echo "<pre>row";
// print_r($row);
// echo "</pre>";
$fp = fopen($this->csvFileObject, 'a');
foreach ($response['hits']['hits'] as $fields) {
fputcsv($fp,$fields['_source']);
}
$scroll_id = $response['_scroll_id'];
} else {
fclose($fp);
// All done scrolling over data
echo $tablename;
echo "<br>All done scrolling over data<br>";
break;
}
}
}
Let's import the csv files in mysql
make sure the local infile import is enabled, use below queries
show global variables like 'local_infile';
SET GLOBAL local_infile=1;
store data in DB
array_pluck:to extract the hits object
jsonToCsv:to convert json data in csv
Sql query will look something like:
LOAD DATA LOCAL INFILE 'http://0.0.0.0:8082//ForSql_importCSV/custom_carrier_post.csv' INTO TABLE custom_carrier_post FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY ' ' IGNORE 1 ROWS
private function check_db_for_index($data)
{
if (empty($data['hits']['hits'])) {
echo "no index found";
return;
}
$tablename= $data['hits']['hits'][0]['_index'];
$source_data = array_pluck( $data['hits']['hits'], '_source' );
jsonToCsv($source_data,"./ForSql_importCSV/".$tablename.".csv");
echo $tablename;
echo "<br>";
$folder= $this->home_url('/ForSql_importCSV/');
// echo $folder;
// exit;
$sqlquery="LOAD DATA LOCAL INFILE '".$folder.$tablename.".csv'
INTO TABLE ".$tablename."
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '\"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS";
echo "<br>";
echo $sqlquery;
$this->pdo_conn->exec($sqlquery);
}
Top comments (0)