背景介绍
由于历史原因,我们部门的 MySQL 中间件既有 Mycat , 也有 Cobar 。 Cobar 号称支持事务, 但是居然不支持 START TRANSACTION 和 BEGIN 显式地开启事务。
单库事务完全支持,分布式事务不能保持强一致性。
分布式事务采用两阶段执行,即分为执行阶段和提交阶段
执行阶段:把前端连接上当前事务所使用到的后端连接绑定下来,并执行SQL语句
提交阶段:将commit命令分发到这些绑定的后端连接中。
在整个事务过程中,执行阶段出错,可以回滚。提交阶段出错不可以回滚。可以说只要是commit之前,执行出现不一致,cobar会自动回滚。
如何兼容
可以通过 SET AUTOCOMMIT=0 来开启事务, 但是事务结束后,马上又开启了新的事务,如果后面的语句不希望起事务,那么一定要在这次事务中提交 SET AUTOCOMMIT=1 ,使改动立即生效。
我们用的 mysql 驱动是 go-sql-driver/mysql ,为了适配 Cobar,只能改代码了。
connection.go
-err := mc.exec("START TRANSACTION")
+err := mc.exec("SET AUTOCOMMIT=0")
transaction.go
err = tx.mc.exec("COMMIT")
+err = tx.mc.exec("SET AUTOCOMMIT=1")
err = tx.mc.exec("ROLLBACK")
+err = tx.mc.exec("SET AUTOCOMMIT=1")
详细改动 见此
改完是能用的。但是后来这个项目迁移到我们的 SOA 框架去了,那个框架的基础库也有一份 go-sql-driver/mysql (没改过的),而且因为框架的设计原因,这个库一定会被引入的
_ "github.com/go-sql-driver/mysql"
问题是,我还得用改过的那份代码呢,并且二者是不能同时引入的。这就蛋疼了。
为何不能同时引入 github.com/go-sql-driver/mysql 和 github.com/yiyulantian/mysql (就是上面提到改动的那份fork) 呢? 其实它俩对于 database/sql 而言,是同一个数据库驱动。
go-sql-driver/mysql 是这样注册进去的
func init() {
sql.Register("mysql", &MySQLDriver{})
}
而 sql.Register 做了名称的唯一性检查:
// Register makes a database driver available by the provided name. // If Register is called twice with the same name or if driver is nil, // it panics. func Register(name string, driver driver.Driver) { driversMu.Lock() defer driversMu.Unlock() if driver == nil { panic("sql: Register driver is nil") } // 看到没,重复了会 panic 的 if _, dup := drivers[name]; dup { panic("sql: Register called twice for driver " + name) } drivers[name] = driver } |
对于这种 panic, 总不能为了绕过去而做一次 recover 吧。
于是我决定曲线救国, 改个名字。
func init() {
- sql.Register("mysql", &MySQLDriver{})
+ sql.Register("mysql_cobar", &MySQLDriver{})
}
在使用上,创建一个 DB 对象就是这样啦:
db, err = sql.Open("mysql_cobar", "YOUR_DSN")
希望 DBA 快点废掉 cobar !
Cobar 和 Mycat 对 BEGIN 和 START TrANSACTION 的代码实现
先看下 Cobar(不贴代码了,自己点链接去看)
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.cobar.server.handler; import com.alibaba.cobar.config.ErrorCode; import com.alibaba.cobar.server.ServerConnection; /** * @author xianmao.hexm */ public final class BeginHandler { public static void handle(String stmt, ServerConnection c) { c.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported statement"); } } |
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.cobar.server.handler; import com.alibaba.cobar.config.ErrorCode; import com.alibaba.cobar.server.ServerConnection; import com.alibaba.cobar.server.parser.ServerParse; import com.alibaba.cobar.server.parser.ServerParseStart; /** * @author xianmao.hexm */ public final class StartHandler { public static void handle(String stmt, ServerConnection c, int offset) { switch (ServerParseStart.parse(stmt, offset)) { case ServerParseStart.TRANSACTION: c.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported statement"); break; default: c.execute(stmt, ServerParse.START); } } } |
再看下 Mycat
/* * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software;Designed and Developed mainly by many Chinese * opensource volunteers. you can redistribute it and/or modify it under the * terms of the GNU General Public License version 2 only, as published by the * Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Any questions about this component can be directed to it's project Web address * https://code.google.com/p/opencloudb/. * */ package io.mycat.server.handler; import io.mycat.server.ServerConnection; /** * @author mycat */ public final class BeginHandler { private static final byte[] AC_OFF = new byte[] { 7, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0 }; public static void handle(String stmt, ServerConnection c) { if (c.isAutocommit()) { c.setAutocommit(false); c.write(c.writeToBuffer(AC_OFF, c.allocate())); }else { c.getSession2().commit() ; } } } |
/* * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software;Designed and Developed mainly by many Chinese * opensource volunteers. you can redistribute it and/or modify it under the * terms of the GNU General Public License version 2 only, as published by the * Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Any questions about this component can be directed to it's project Web address * https://code.google.com/p/opencloudb/. * */ package io.mycat.server.handler; import io.mycat.config.ErrorCode; import io.mycat.server.ServerConnection; import io.mycat.server.parser.ServerParse; import io.mycat.server.parser.ServerParseStart; /** * @author mycat */ public final class StartHandler { private static final byte[] AC_OFF = new byte[] { 7, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0 }; public static void handle(String stmt, ServerConnection c, int offset) { switch (ServerParseStart.parse(stmt, offset)) { case ServerParseStart.TRANSACTION: if (c.isAutocommit()) { c.setAutocommit(false); c.write(c.writeToBuffer(AC_OFF, c.allocate())); }else { c.getSession2().commit() ; } break; default: c.execute(stmt, ServerParse.START); } } } |
代码链接都是截止至 2017.01.19 最新的commit。