import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Delete;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
/**
* 索引节点管理
*
* @author lican
* @date 2018-12-11
*/
@Component
public class CollectionManager {
private static final Logger LOG = LoggerFactory.getLogger(CollectionManager.class);
@Autowired
private CloudSolrClient solrCloudClient;
@Autowired
private ZkConfigManager zkConfigManager;
private final String configPath = this.getClass().getClassLoader().getResource("solrconfig").getPath();
/**
* 获取schemaFields参数
*/
private static ModifiableSolrParams schemaFieldSolrParams = new ModifiableSolrParams();
private static ModifiableSolrParams schemaCopyFieldSolrParams = new ModifiableSolrParams();
private static ModifiableSolrParams schemaUniqueKeySolrParams = new ModifiableSolrParams();
static {
// 是否显示域类型的默认信息
schemaFieldSolrParams.add("showDefaults", "false");
// 是否返回动态域的定义信息
schemaFieldSolrParams.add("includeDynamic", "true");
// 指定返回那些域的定义信息
schemaFieldSolrParams.add("f1", "name,_version_");
schemaCopyFieldSolrParams.add("wt", "json");
// 设置返回sourcefield信息
schemaCopyFieldSolrParams.add("source.fl", "name");
schemaUniqueKeySolrParams.add("wt", "json");
}
/**
* 创建字段
*
* @param param
* @throws Exception
*/
public boolean createFields(String collection, List
LOG.info("CollectionManager.createFields...collection:{},schema:{}", collection, schemaParams);
try {
List
if (CollectionUtils.isEmpty(diffFields)) {
LOG.info("create fields fail,Fields is exist,collection:{},schemaParams:{}", collection, schemaParams);
return false;
}
for (SchemaParam schemaParam : diffFields) {
SchemaRequest.UniqueKey uniqueKey = new SchemaRequest.UniqueKey(schemaUniqueKeySolrParams);
insertSchemaField(collection, schemaParam);
}
LOG.info("[createFields]create success");
return true;
} catch (Exception e) {
LOG.error("[createFields] create fail,collection:{},schema:{}", collection, schemaParams, e);
return false;
}
}
/**
* 写入单个field到schema
*
* @param collection
* @param schemaParam
* @throws Exception
*/
private void insertSchemaField(String collection, SchemaParam schemaParam) throws Exception {
String fieldName = schemaParam.getName();
// 增加到字段到Collection
Map<String, Object> fieldAttributes = new HashMap<>();
fieldAttributes = new HashMap<>();
fieldAttributes.put(SchemaParam.NAME, fieldName);
fieldAttributes.put(SchemaParam.TYPE, schemaParam.getType());
fieldAttributes.put(SchemaParam.INDEXED, schemaParam.getIndexed());
fieldAttributes.put(SchemaParam.STORED, schemaParam.getStored());
fieldAttributes.put(SchemaParam.REQUIRED, schemaParam.getRequired());
fieldAttributes.put(SchemaParam.MULTIVALUED, schemaParam.getMultiValued());
fieldAttributes.put(SchemaParam.OMIT_NORMS, schemaParam.getOmitNorms());
SchemaRequest.AddField addField = new SchemaRequest.AddField(fieldAttributes);
addField.process(solrCloudClient, collection);
LOG.info("[insertField]field :{},insert success", fieldName);
}
/**
* 对比获取到schema中没有的Field
*
* @param collection
* @param schemaParams
* @return
*/
private List
List
List
for (SchemaParam schemaParam : schemaParams) {
String fieldName = schemaParam.getName();
if (CollectionUtils.isNotEmpty(schemaFields) && !schemaFields.contains(fieldName)) {
diffSchemas.add(schemaParam);
}
}
return diffSchemas;
}
/**
* 创建 copyField, 可用于数据union 主要参数source、dest,复制与只要将多个域组合成一个域
*
* @param collection
* @param columns
*/
public void createCopyFields(String collection, List
LOG.info("create copy fields...collection:{},schema:{}", collection, schemaParams);
List
SchemaRequest.AddCopyField addCopyField = new SchemaRequest.AddCopyField("union_field", destFields);
try {
SchemaResponse.UpdateResponse response = addCopyField.process(solrCloudClient, collection);
NamedList
/**
* 对比获取到schema中没有的copyField
*
* @return
*/
private List
List
List
for (SchemaParam schemaParam : schemaParams) {
String fieldName = schemaParam.getName();
if (!sorecesCopyFields.contains(fieldName)) {
destFields.add(fieldName);
}
}
return destFields;
}
/**
* 查看所有copyFields
*
* @param collection
* @return
*/
public List
List
SchemaRequest.CopyFields allCopyFields = new SchemaRequest.CopyFields(schemaCopyFieldSolrParams);
try {
SchemaResponse.CopyFieldsResponse response = allCopyFields.process(solrCloudClient, collection);
NamedList
/**
* 按字段名删除字段
*
* @param connection
* @param fieldName
*/
public void deleteField(String collection, String fieldName) {
SchemaRequest.DeleteField deleteField = new SchemaRequest.DeleteField(fieldName);
try {
SchemaResponse.UpdateResponse response = deleteField.process(solrCloudClient, collection);
if (null != response) {
LOG.info("[deleteFields]delete success,collection:{},field:{},requestUrl:{}", collection, fieldName,
response.getRequestUrl());
}
} catch (Exception e) {
LOG.error("[deleteFields]delete fail,collection:{},field:{}", collection, fieldName, e);
}
}
/**
* 覆盖字段
*
* @param collection
*/
public void updateField(String collection, List
Map<String, Object> fieldAttributes = new HashMap<>();
fieldAttributes.put("name", "product_name");
fieldAttributes.put("type", "date");
fieldAttributes.put("stored", "true");
fieldAttributes.put("omitNorms", true);
SchemaRequest.ReplaceField replaceField = new SchemaRequest.ReplaceField(fieldAttributes);
try {
replaceField.process(solrCloudClient, collection);
} catch (Exception e) {
LOG.error("[updateFields]update fail,collection:{},schemaParams:{}", collection, schemaParams, e);
}
}
/**
* 上传默认配置到zk 只有新增索引时调用
*
* @param collection
* @return
*/
public boolean uploadDefaultConfig(String collection) {
LOG.info("[ready]upload default config ----ready----,collection:{}", collection);
try {
zkConfigManager.uploadConfigDir(Paths.get(configPath), collection);
LOG.info("[success]upload default config ----success----,collection:{}", collection);
return true;
} catch (IOException e) {
LOG.error("[fail]upload default config ----fail----,collection:{}", collection, e);
return false;
}
}
/**
* 创建索引节点
*
* @param collection
* @return
*/
public boolean createCollection(String collection) {
LOG.info("[ready]create collection ----ready----,collection:{}", collection);
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, 1, 1);
try {
CollectionAdminResponse resp = create.process(solrCloudClient);
if (null == resp) {
LOG.error("[fail]create collection ----fail----,create response is null,collection:{}", collection);
return false;
}
if (!resp.isSuccess()) {
LOG.error("[fail]create collection ----fail----,error message:{},collection:{}",
resp.getErrorMessages(), collection);
return false;
}
LOG.info("[success]create collection ----success----,collection:{}", collection);
return true;
} catch (SolrServerException | IOException e) {
LOG.error("[fail]create collection ----fail----,collection:{}", collection, e);
return false;
}
}
/**
* 刷新节点配置信息(新增字段需要刷新后建索引)
*
* @param collection
* @return
*/
public boolean reloadCollection(String collection) {
LOG.info("[ready]reload collection ----ready----,collection:{}", collection);
CollectionAdminRequest.Reload reload = CollectionAdminRequest.reloadCollection(collection);
try {
CollectionAdminResponse resp = reload.process(solrCloudClient);
if (null == resp) {
LOG.error("[fail]reload collection ----fail----,reload response is null,collection:{}", collection);
return false;
}
if (!resp.isSuccess()) {
LOG.error("[fail]reload collection ----fail----,error message:{},collection:{}",
resp.getErrorMessages(), collection);
return false;
}
LOG.info("[success]reload collection ----success----,collection:{}", collection);
return true;
} catch (SolrServerException | IOException e) {
LOG.error("[fail]reload collection ----fail----,collection:{}", collection, e);
return false;
}
}
/**
* 删除索引节点
*
* @param collection
* @throws Exception
*/
public void deleteCollection(String collection) throws Exception {
LOG.info("deleteCollection:{},------ready------", collection);
if (isExist(collection)) {
Delete delete = CollectionAdminRequest.deleteCollection(collection);
delete.process(solrCloudClient);
LOG.info("deleteCollection:{},------success------", collection);
} else {
LOG.info("deleteCollection:{},------fail------,找不到要删除的collection", collection);
}
}
/**
* 索引是否存在
*
* @param collection
* @return
* @throws Exception
*/
public boolean isExist(String collection) throws Exception {
boolean isConnection = solrCloudClient.getZkStateReader().getClusterState().hasCollection(collection);
if (isConnection) {
LOG.info("CollectionManager.collection:{},is exist", collection);
} else {
LOG.info("CollectionManager.collection:{},not exist", collection);
}
return isConnection;
}
/**
* 查看schema 所有field
*
* @param collection
* @return
* @throws Exception
*/
public List
List
SchemaRequest.Fields allFields = new SchemaRequest.Fields(schemaFieldSolrParams);
SchemaResponse.FieldsResponse response;
try {
response = allFields.process(solrCloudClient, collection);
NamedList
@SuppressWarnings("unchecked")
List<NamedList
for (NamedList
for (Entry<String, Object> entry : field) {
String key = entry.getKey();
Object val = entry.getValue();
if ("name".equals(key)) {
if (null == val) {
allField.add("");
} else {
allField.add(val.toString());
}
}
}
}
} catch (SolrServerException | IOException e) {
LOG.error("Get Schema Fields fail,colleciotn:{}", collection, e);
}
return allField;
}
}