Skip to content

Commit

Permalink
Merge pull request #17 from MyCATApache/1.6
Browse files Browse the repository at this point in the history
update
  • Loading branch information
magicdoom authored Oct 2, 2017
2 parents 7ae19a0 + 8cf5282 commit c905aae
Show file tree
Hide file tree
Showing 115 changed files with 5,118 additions and 1,256 deletions.
4 changes: 2 additions & 2 deletions README_Chinese.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
### 官网:[http://www.mycat.org.cn](http://www.mycat.org.cn)

### github:[https://github.com/MyCATApache](https://github.com/MyCATApache)
##### 入门: [zh-CN: https://github.com/MyCATApache/Mycat-doc/blob/master/MyCat_In_Action_%E4%B8%AD%E6%96%87%E7%89%88.doc] [English:https://github.com/MyCATApache/Mycat-doc/tree/master/en]
##### 入门: [zh-CN: https://github.com/MyCATApache/Mycat-doc/blob/master/history/MyCat_In_Action_%E4%B8%AD%E6%96%87%E7%89%88.doc] [English:https://github.com/MyCATApache/Mycat-doc/tree/master/en]

什么是Mycat?简单的说,Mycat就是:

Expand Down Expand Up @@ -56,4 +56,4 @@ github上面的Mycat-download项目是编译好的二进制安装包 [https://gi

##### 文档:

github上面的Mycat-doc项目是相关文档 [https://github.com/MyCATApache/Mycat-doc](https://github.com/MyCATApache/Mycat-doc)
github上面的Mycat-doc项目是相关文档 [https://github.com/MyCATApache/Mycat-doc](https://github.com/MyCATApache/Mycat-doc)
10 changes: 7 additions & 3 deletions conf/dnindex.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#update
#Mon Oct 19 16:26:50 CST 2015
#Mon Apr 24 16:41:54 CST 2017
dh1=0
jdbchost=0
dataHost2=0
localhost1=0
localhost4=0
dataHost1=0
localhost3=0
localhost2=0
localhost1=0
jdbclhost=0
jdbchost=0
16 changes: 15 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.mycat</groupId>
<artifactId>Mycat-server</artifactId>
<version>1.6.5-BETA</version>
<version>1.6.5-release</version>
<packaging>jar</packaging>
<name>Mycat-server</name>
<description>The project of Mycat-server</description>
Expand Down Expand Up @@ -225,6 +225,12 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.9.Final</version>
</dependency>
</dependencies>


Expand Down Expand Up @@ -490,6 +496,14 @@
<name>set.default.REPO_DIR</name>
<value>lib</value>
</property>
<property>
<name>wrapper.logfile.maxsize</name>
<value>512m</value>
</property>
<property>
<name>wrapper.logfile.maxfiles</name>
<value>30</value>
</property>
<property>
<name>wrapper.logfile</name>
<value>logs/wrapper.log</value>
Expand Down
56 changes: 42 additions & 14 deletions src/main/assembly/bin/init_zk_data.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
#!/bin/bash

echo "check JAVA_HOME & java"
JAVA_CMD=$JAVA_HOME/bin/java
MYCAT_HOME="$(dirname `readlink -f $0`)/.."
MAIN_CLASS=io.mycat.config.loader.zkprocess.xmltozk.XmltoZkMain
if [ ! -d "$JAVA_HOME" ]; then
echo ---------------------------------------------------
echo WARN: JAVA_HOME environment variable is not set.
echo ---------------------------------------------------
JAVA_CMD=java

JAVA_CMD=""

#function log_info <msg>
#stdout: YYYY-mm-dd HH:MM:ss INFO msg
function log_info() { date +o"%F %T INFO $1" ; }
function log_error() { date +o"%F %T ERROR $1" ; }

#01. Locate java(JRE)
java_in_wrapper="`sed -nr \
-e 's/^wrapper.java.command=(.*)[[:blank:]]*$/\1/p' \
$MYCAT_HOME/conf/wrapper.conf`"

# test java(JRE) in this order:
# wrapper.conf's java -> $JAVA_HOME/bin/java -> $PATH/java
for java_cmd in "$java_in_wrapper" "$JAVA_HOME/bin/java" "java" ; do
if $java_cmd -Xmx1m -version &>/dev/null ; then
JAVA_CMD=$java_cmd
break
fi
done

if [ "$JAVA_CMD" == "" ]; then
cat <<EOF
`date +'%F %T'` ERROR Not found usable java in following path:
$java_in_wrapper, $JAVA_HOME/bin/java, \$PATH/java
Operations would not going on.
EOF
exit 1
fi

log_info "JAVA_CMD=$JAVA_CMD"

#02. Initialize /mycat of ZooKeeper
log_info "Start to initialize /mycat of ZooKeeper"

if ! $JAVA_CMD -Xms256M -Xmx1G -DMYCAT_HOME=$MYCAT_HOME -cp "$MYCAT_HOME/conf:$MYCAT_HOME/lib/*" $MAIN_CLASS ; then
log_error "Something wrong happened, please refer logs above"
exit 1
fi

echo "---------set HOME_DIR------------"
CURR_DIR=`pwd`
cd ..
MYCAT_HOME=`pwd`
cd $CURR_DIR
$JAVA_CMD -Xms256M -Xmx1G -XX:MaxPermSize=64M -DMYCAT_HOME=$MYCAT_HOME -cp "$MYCAT_HOME/conf:$MYCAT_HOME/lib/*" $MAIN_CLASS
echo "---------finished------------"
log_info "Done"
exit 0
77 changes: 53 additions & 24 deletions src/main/java/io/mycat/MycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,36 @@
*/
package io.mycat;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import io.mycat.buffer.NettyBufferPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.io.Files;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import io.mycat.backend.BackendConnection;
import io.mycat.backend.datasource.PhysicalDBNode;
import io.mycat.backend.datasource.PhysicalDBPool;
Expand Down Expand Up @@ -58,7 +87,7 @@
import io.mycat.route.MyCATSequnceProcessor;
import io.mycat.route.RouteService;
import io.mycat.route.factory.RouteStrategyFactory;
import io.mycat.route.sequence.handler.*;
import io.mycat.route.sequence.handler.SequenceHandler;
import io.mycat.server.ServerConnectionFactory;
import io.mycat.server.interceptor.SQLInterceptor;
import io.mycat.server.interceptor.impl.GlobalTableUtil;
Expand All @@ -71,26 +100,7 @@
import io.mycat.util.ExecutorUtil;
import io.mycat.util.NameableExecutor;
import io.mycat.util.TimeUtil;

import java.io.*;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import io.mycat.util.ZKUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

/**
* @author mycat
Expand All @@ -115,7 +125,7 @@ public class MycatServer {
private volatile int channelIndex = 0;

//全局序列号
private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();
// private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();
private final DynaClassLoader catletClassLoader;
private final SQLInterceptor sqlInterceptor;
private volatile int nextProcessor;
Expand Down Expand Up @@ -147,6 +157,7 @@ public static final MycatServer getInstance() {
private NIOProcessor[] processors;
private SocketConnector connector;
private NameableExecutor businessExecutor;
private NameableExecutor sequenceExecutor;
private NameableExecutor timerExecutor;
private ListeningExecutorService listeningExecutorService;
private InterProcessMutex dnindexLock;
Expand Down Expand Up @@ -224,7 +235,7 @@ public DynaClassLoader getCatletClassLoader() {
}

public MyCATSequnceProcessor getSequnceProcessor() {
return sequnceProcessor;
return MyCATSequnceProcessor.getInstance();
}

public SQLInterceptor getSqlInterceptor() {
Expand Down Expand Up @@ -298,6 +309,9 @@ public void startup() throws IOException {

SystemConfig system = config.getSystem();
int processorCount = system.getProcessors();

//init RouteStrategyFactory first
RouteStrategyFactory.init();

// server startup
LOGGER.info(NAME + " is ready to startup ...");
Expand Down Expand Up @@ -356,6 +370,11 @@ public void startup() throws IOException {
*/

totalNetWorkBufferSize = 6*bufferPoolPageSize * bufferPoolPageNumber;
break;
case 2:
bufferPool = new NettyBufferPool(bufferPoolChunkSize);
LOGGER.info("Use Netty Buffer Pool");

break;
default:
bufferPool = new DirectByteBufferPool(bufferPoolPageSize,bufferPoolChunkSize,
Expand All @@ -377,6 +396,7 @@ public void startup() throws IOException {
}
businessExecutor = ExecutorUtil.create("BusinessExecutor",
threadPoolSize);
sequenceExecutor = ExecutorUtil.create("SequenceExecutor", threadPoolSize);
timerExecutor = ExecutorUtil.create("Timer", system.getTimerExecutor());
listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor);

Expand Down Expand Up @@ -474,7 +494,7 @@ public Thread newThread(Runnable r) {
//定期清理结果集排行榜,控制拒绝策略
scheduler.scheduleAtFixedRate(resultSetMapClear(),0L, system.getClearBigSqLResultSetMapMs(),TimeUnit.MILLISECONDS);

RouteStrategyFactory.init();
// new Thread(tableStructureCheck()).start();

//XA Init recovery Log
Expand Down Expand Up @@ -976,13 +996,21 @@ private CoordinatorLogEntry[] getCoordinatorLogEntries(){
if(allCoordinatorLogEntries.size()==0){return new CoordinatorLogEntry[0];}
return allCoordinatorLogEntries.toArray(new CoordinatorLogEntry[allCoordinatorLogEntries.size()]);
}

public NameableExecutor getSequenceExecutor() {
return sequenceExecutor;
}


//huangyiming add
public DirectByteBufferPool getDirectByteBufferPool() {
return (DirectByteBufferPool)bufferPool;
}

public boolean isAIO() {
return aio;
}


public ListeningExecutorService getListeningExecutorService() {
return listeningExecutorService;
}
Expand All @@ -995,4 +1023,5 @@ public static void main(String[] args) throws Exception {
byte[] data= zk.getData().forPath(path);
System.out.println(data.length);
}

}
12 changes: 7 additions & 5 deletions src/main/java/io/mycat/backend/ConQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public void incExecuteCount() {
this.executeCount++;
}

public void removeCon(BackendConnection con) {
if (!autoCommitCons.remove(con)) {
manCommitCons.remove(con);
public boolean removeCon(BackendConnection con) {
boolean removed = autoCommitCons.remove(con);
if (!removed) {
return manCommitCons.remove(con);
}
return removed;
}

public boolean isSameCon(BackendConnection con) {
Expand All @@ -65,13 +67,13 @@ public ArrayList<BackendConnection> getIdleConsToClose(int count) {
count);
while (!manCommitCons.isEmpty() && readyCloseCons.size() < count) {
BackendConnection theCon = manCommitCons.poll();
if (theCon != null) {
if (theCon != null&&!theCon.isBorrowed()) {
readyCloseCons.add(theCon);
}
}
while (!autoCommitCons.isEmpty() && readyCloseCons.size() < count) {
BackendConnection theCon = autoCommitCons.poll();
if (theCon != null) {
if (theCon != null&&!theCon.isBorrowed()) {
readyCloseCons.add(theCon);
}

Expand Down
Loading

0 comments on commit c905aae

Please sign in to comment.