RocketMQ之Pull消费者客户端启动

Stella981
• 阅读 938

Pull消费者客户端(主动拉取消息的消费者)即构造了DefaultMQPullConsumer对象,DefaultMQPullConsumer继承了ClientConfig类。我们先看其构造方法

[java] view plain copy
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {  
    this.consumerGroup = consumerGroup;  
    defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);  
}

这里只是简单设置了consumerGroup消费者组名,表示消费者属于哪个组。构造了DefaultMQPullConsumerImpl的实例,DefaultMQPullConsumerImpl的构造方法很简单,只是绑定了DefaultMQPullConsumer、配置了传入的rpcHook。

DefaultMQPullConsumer内部封装了DefaultMQPullConsumerImpl,其中还维护这一些配置信息。这里维护着消费者订阅的topic集合。

[java] view plain copy
private Set<String> registerTopics = new HashSet<String>();

整个消费者客户端的启动,调用了DefaultMQPullConsumer的start()方法,内部直接调用DefaultMQPullConsumerImpl的start()方法,这个start方法加了synchronized修饰。

[java] view plain copy
    public synchronized void start() throws MQClientException {  
        switch (this.serviceState) {  
            case CREATE_JUST:  
                this.serviceState = ServiceState.START_FAILED;  
  
                this.checkConfig();  
  
                this.copySubscription();  
  
                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {  
                    this.defaultMQPullConsumer.changeInstanceNameToPID();  
                }  
  
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer  
                                                                                                                , this.rpcHook);  
  
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());  
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());  
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());  
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);  
  
                this.pullAPIWrapper = new PullAPIWrapper(  
                    mQClientFactory,  
                    this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());  
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);  
  
                if (this.defaultMQPullConsumer.getOffsetStore() != null) {  
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();  
                } else {  
                    switch (this.defaultMQPullConsumer.getMessageModel()) {  
                        case BROADCASTING:  
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        case CLUSTERING:  
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        default:  
                            break;  
                    }  
                    this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);  
                }  
  
                this.offsetStore.load();  
  
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);  
                if (!registerOK) {  
                    this.serviceState = ServiceState.CREATE_JUST;  
  
                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()  
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl  
                                                                                            .GROUP_NAME_DUPLICATE_URL), null);  
                }  
  
                mQClientFactory.start();  
                log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());  
                this.serviceState = ServiceState.RUNNING;  
                break;  
            case RUNNING:  
            case START_FAILED:  
            case SHUTDOWN_ALREADY:  
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "  
                    + this.serviceState  
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  
                    null);  
            default:  
                break;  
        }  
  
    }

一开始的serverState的状态自然为CREAT_JUST,调用checkConfig(),其中先是对ConsumerGroup进行验证,非空,合法(符合正则规则,且长度不超过配置最大值),且不为默认值(防止消费者集群名冲突),然后对消费者消息模式、消息队列分配算法进行非空、合法校验。

关于消费者消息模式有BroadCasting(广播)跟Clustering(集群)两种、默认是Clustering(集群)配置在DefaultMQPullConsumer中。关于消费者的消息分配算法,在DefaultMQPullConsumer中实现有默认的消息分配算法,allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();(平均分配算法)。其实现了AllocateMessageQueueStrategy接口,重点看其实现的allocate()方法。

[java] view plain copy
@Override  
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,  
    List<String> cidAll) {  
    if (currentCID == null || currentCID.length() < 1) {  
        throw new IllegalArgumentException("currentCID is empty");  
    }  
    if (mqAll == null || mqAll.isEmpty()) {  
        throw new IllegalArgumentException("mqAll is null or mqAll empty");  
    }  
    if (cidAll == null || cidAll.isEmpty()) {  
        throw new IllegalArgumentException("cidAll is null or cidAll empty");  
    }  
  
    List<MessageQueue> result = new ArrayList<MessageQueue>();  
    if (!cidAll.contains(currentCID)) {  
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",  
            consumerGroup,  
            currentCID,  
            cidAll);  
        return result;  
    }  
  
    int index = cidAll.indexOf(currentCID);  
    int mod = mqAll.size() % cidAll.size();  
    int averageSize =  
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()  
            + 1 : mqAll.size() / cidAll.size());  
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;  
    int range = Math.min(averageSize, mqAll.size() - startIndex);  
    for (int i = 0; i < range; i++) {  
        result.add(mqAll.get((startIndex + i) % mqAll.size()));  
    }  
    return result;  
}

传入的参数有当前消费者id,所有消息队列数组,以及当前所有消费者数组。先简单验证非空,再通过消费者数组大小跟消息队列大小根据平均算法算出当前消费者该分配哪些消息队列集合。逻辑不难。RocketMQ还提供了循环平均、一致性哈希、配置分配等算法,这里默认采用平均分配。

我们再回到DefaultMQPullConsumerImpl的start()方法,checkConfig后,调用copySubscription()方法,将配置在DefaultMQPullConsumer中的topic信息构造成并构造成subscriptionData数据结构,以topic为key以subscriptionData为value以键值对形式存到rebalanceImpl的subscriptionInner中。

[java] view plain copy
private void copySubscription() throws MQClientException {  
    try {  
        Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();  
        if (registerTopics != null) {  
            for (final String topic : registerTopics) {  
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),  
                    topic, SubscriptionData.SUB_ALL);  
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);  
            }  
        }  
    } catch (Exception e) {  
        throw new MQClientException("subscription exception", e);  
    }  
}

接下来从MQCLientManager中得到MQClient的实例,这个步骤跟生产者客户端相同。

再往后是对rebalanceImpl的配置,我们重点看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成员中直接构造private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);即在DefaultMQPullConsumerImpl初始化的时候构造。接下来对其消费者组名、消息模式(默认集群)、队列分配算法(默认平均分配)、消费者客户端实例进行配置,配置信息都是从DefaultMQPullConsumer中取得。

[java] view plain copy
public abstract class RebalanceImpl {  
    protected static final Logger log = ClientLogger.getLog();  
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);  
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =  
        new ConcurrentHashMap<String, Set<MessageQueue>>();  
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =  
        new ConcurrentHashMap<String, SubscriptionData>();  
    protected String consumerGroup;  
    protected MessageModel messageModel;  
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;  
    protected MQClientInstance mQClientFactory;

接下来构造了PullAPIWrapper,仅仅调用其构造方法,简单的配置下

[java] view plain copy
public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {  
    this.mQClientFactory = mQClientFactory;  
    this.consumerGroup = consumerGroup;  
    this.unitMode = unitMode;  
}

然后初始化消费者的offsetStore,offset即偏移量,可以理解为消费进度,这里根据不同的消息模式来选择不同的策略。如果是广播模式,那么所有消费者都应该收到订阅的消息,那么每个消费者只应该自己消费的消费队列的进度,那么需要把消费进度即offsetStore存于本地采用LocalFileOffsetStroe,相反的如果是集群模式,那么集群中的消费者来平均消费消息队列,那么应该把消费进度存于远程采用RemoteBrokerOffsetStore。然后调用相应的load方法加载。

之后将当前消费者注册在MQ客户端实例上之后,调用MQClientInstance的start()方法,启动消费者客户端。

[java] view plain copy
    public void start() throws MQClientException {  
  
        synchronized (this) {  
            switch (this.serviceState) {  
                case CREATE_JUST:  
                    this.serviceState = ServiceState.START_FAILED;  
                    // If not specified,looking address from name server  
                    if (null == this.clientConfig.getNamesrvAddr()) {  
                        this.mQClientAPIImpl.fetchNameServerAddr();  
                    }  
                    // Start request-response channel  
                    this.mQClientAPIImpl.start();  
                    // Start various schedule tasks  
                    this.startScheduledTask();  
                    // Start pull service  
                    this.pullMessageService.start();  
                    // Start rebalance service  
                    this.rebalanceService.start();  
                    // Start push service  
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);  
                    log.info("the client factory [{}] start OK", this.clientId);  
                    this.serviceState = ServiceState.RUNNING;  
                    break;  
                case RUNNING:  
                    break;  
                case SHUTDOWN_ALREADY:  
                    break;  
                case START_FAILED:  
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed."  
                                                                                                                                , null);  
                default:  
                    break;  
            }  
        }  
    }

看到这里应该很熟悉,跟生产者客户端这里是同一段代码,无非解析路由消息并完成路由消息的配置,启动netty客户端,启动定时任务(定时更新从名称服务器获取路由信息更新本地路由信息,心跳,调整线程数量),后面启动pull server、rebalance service、push service最后把serviceState状态设为Running表示客户端启动。

我们在这里重点看下RebalanceService的启动。下面贴出的是RebalanceService的run()方法。

[java] view plain copy
@Override  
public void run() {  
    log.info(this.getServiceName() + " service started");  
  
    while (!this.isStopped()) {  
        this.waitForRunning(waitInterval);  
        this.mqClientFactory.doRebalance();  
    }  
  
    log.info(this.getServiceName() + " service end");  
}

可以看到,只要这个线程没有被停止(客户端没关闭),会一直循环调用客户端的doRebalance()方法。

[java] view plain copy
public void doRebalance() {  
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {  
        MQConsumerInner impl = entry.getValue();  
        if (impl != null) {  
            try {  
                impl.doRebalance();  
            } catch (Throwable e) {  
                log.error("doRebalance exception", e);  
            }  
        }  
    }  
}  

MQClientInstance遍历consumerTable(之前注册的时候以consumerGroup为key,以消费者客户端DefaultMQPullConsumerImpl为value存入consumerTable中)中的每个元素,循环调用其元素的doRebalance()方法。那我们看DefaultMQPullConsumerImpl的doRebalance方法。

1 [java] view plain copy
2 @Override  
3 public void doRebalance() {  
4     if (this.rebalanceImpl != null) {  
5         this.rebalanceImpl.doRebalance(false);  
6     }  
7 }

直接调用了rebalanceImpl的doRebalance方法

[java] view plain copy
public void doRebalance(final boolean isOrder) {  
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();  
    if (subTable != null) {  
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {  
            final String topic = entry.getKey();  
            try {  
                this.rebalanceByTopic(topic, isOrder);  
            } catch (Throwable e) {  
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
                    log.warn("rebalanceByTopic Exception", e);  
                }  
            }  
        }  
    }  
  
    this.truncateMessageQueueNotMyTopic();  
}  

可以看到先得到subTable即subscriptionInner,之前根据配置的每个topic生成的SubscriptionData数据结构的map。先遍历该map,得到每个topic,针对每个topic调用rebalanceByTopic()

 1 [java] view plain copy
 2     private void rebalanceByTopic(final String topic, final boolean isOrder) {  
 3         switch (messageModel) {  
 4             case BROADCASTING: {  
 5                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
 6                 if (mqSet != null) {  
 7                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);  
 8                     if (changed) {  
 9                         this.messageQueueChanged(topic, mqSet, mqSet);  
10                         log.info("messageQueueChanged {} {} {} {}",  
11                             consumerGroup,  
12                             topic,  
13                             mqSet,  
14                             mqSet);  
15                     }  
16                 } else {  
17                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
18                 }  
19                 break;  
20             }  
21             case CLUSTERING: {  
22                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
23                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);  
24                 if (null == mqSet) {  
25                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
26                         log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
27                     }  
28                 }  
29   
30                 if (null == cidAll) {  
31                     log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);  
32                 }  
33   
34                 if (mqSet != null && cidAll != null) {  
35                     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();  
36                     mqAll.addAll(mqSet);  
37   
38                     Collections.sort(mqAll);  
39                     Collections.sort(cidAll);  
40   
41                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;  
42   
43                     List<MessageQueue> allocateResult = null;  
44                     try {  
45                         allocateResult = strategy.allocate(  
46                             this.consumerGroup,  
47                             this.mQClientFactory.getClientId(),  
48                             mqAll,  
49                             cidAll);  
50                     } catch (Throwable e) {  
51                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",  
52                              strategy.getName(), e);  
53                         return;  
54                     }  
55   
56                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();  
57                     if (allocateResult != null) {  
58                         allocateResultSet.addAll(allocateResult);  
59                     }  
60   
61                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);  
62                     if (changed) {  
63                         log.info(  
64                             "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}  
65                         , cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",  
66                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),  
67                             allocateResultSet.size(), allocateResultSet);  
68                         this.messageQueueChanged(topic, mqSet, allocateResultSet);  
69                     }  
70                 }  
71                 break;  
72             }  
73             default:  
74                 break;  
75         }  
76     }

我们先重点关注集群模式下,先得到topic的本地路由信息,再通过topic跟这个消费者的组名,调用netty客户端的同步网络访问topic指定的broker,从broker端得到与其连接的且是指定消费组名下订阅指定topic的消费者id的集合。然后采用默认的分配算法的allocate()进行队列给消费者平均分配。然后调用updateProcessQueueTableInRebalance()方法判断是否重新队列分配。

 1 [java] view plain copy
 2 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,  
 3     final boolean isOrder) {  
 4     boolean changed = false;  
 5   
 6     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();  
 7     while (it.hasNext()) {  
 8         Entry<MessageQueue, ProcessQueue> next = it.next();  
 9         MessageQueue mq = next.getKey();  
10         ProcessQueue pq = next.getValue();  
11   
12         if (mq.getTopic().equals(topic)) {  
13             if (!mqSet.contains(mq)) {  
14                 pq.setDropped(true);  
15                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
16                     it.remove();  
17                     changed = true;  
18                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);  
19                 }  
20             } else if (pq.isPullExpired()) {  
21                 switch (this.consumeType()) {  
22                     case CONSUME_ACTIVELY:  
23                         break;  
24                     case CONSUME_PASSIVELY:  
25                         pq.setDropped(true);  
26                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
27                             it.remove();  
28                             changed = true;  
29                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",  
30                                 consumerGroup, mq);  
31                         }  
32                         break;  
33                     default:  
34                         break;  
35                 }  
36             }  
37         }  
38     }  
39   
40     List<PullRequest> pullRequestList = new ArrayList<PullRequest>();  
41     for (MessageQueue mq : mqSet) {  
42         if (!this.processQueueTable.containsKey(mq)) {  
43             if (isOrder && !this.lock(mq)) {  
44                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);  
45                 continue;  
46             }  
47   
48             this.removeDirtyOffset(mq);  
49             ProcessQueue pq = new ProcessQueue();  
50             long nextOffset = this.computePullFromWhere(mq);  
51             if (nextOffset >= 0) {  
52                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);  
53                 if (pre != null) {  
54                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);  
55                 } else {  
56                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);  
57                     PullRequest pullRequest = new PullRequest();  
58                     pullRequest.setConsumerGroup(consumerGroup);  
59                     pullRequest.setNextOffset(nextOffset);  
60                     pullRequest.setMessageQueue(mq);  
61                     pullRequest.setProcessQueue(pq);  
62                     pullRequestList.add(pullRequest);  
63                     changed = true;  
64                 }  
65             } else {  
66                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);  
67             }  
68         }  
69     }  
70   
71     this.dispatchPullRequest(pullRequestList);  
72   
73     return changed;  
74 }

先遍历processQueueTable,看其topic下的该处理消息队列是否还是应该处理,由于新分配之后,消息队列可能会改变,所以原该处理的消息队列可能没必要处理,因此没必要处理的消息队列移除。当然也有可能多出需要处理的消息队列,于是需要建立其与processQueue的对应关系,先调用computerPullFromWhere得到该条消息下次拉取数据的位置,在RebalancePullImpl中实现了该方法直接返回0,把该处理的mq封装成pq后,更新到processQueueTable中。若有更新,无论是增加还是删除,则changed都设为true。(这个地方讲的有点模糊,他是客户端pull与push区别的关键,实际上push不过是在pull之上封装了下操作,后面我们会重新回来分析。)

方法返回后,如果changed为true,会调用messageQueueChanged方法来通知配置在DefaultMQPullConsumer中的相关messageQueueListener,我们可以看到RebalancePullImpl中的实现。

 1 [java] view plain copy
 2 @Override  
 3 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {  
 4     MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();  
 5     if (messageQueueListener != null) {  
 6         try {  
 7             messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);  
 8         } catch (Throwable e) {  
 9             log.error("messageQueueChanged exception", e);  
10         }  
11     }  
12 }

广播模式则比较简单,由于所有消费者都要处理,少了队列分配这个步骤。

本文转载自:https://blog.csdn.net/panxj856856/article/details/80725630

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
2小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(