让Go-sql-driver/Mysql 兼容Cobar的事务

发表于:2017-1-25 09:35

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:holys    来源:51Testing软件测试网采编

  背景介绍
  由于历史原因,我们部门的 MySQL 中间件既有 Mycat , 也有 Cobar 。 Cobar 号称支持事务, 但是居然不支持 START TRANSACTION 和 BEGIN 显式地开启事务。
  单库事务完全支持,分布式事务不能保持强一致性。
  分布式事务采用两阶段执行,即分为执行阶段和提交阶段
  执行阶段:把前端连接上当前事务所使用到的后端连接绑定下来,并执行SQL语句
  提交阶段:将commit命令分发到这些绑定的后端连接中。
  在整个事务过程中,执行阶段出错,可以回滚。提交阶段出错不可以回滚。可以说只要是commit之前,执行出现不一致,cobar会自动回滚。
  如何兼容
  可以通过 SET AUTOCOMMIT=0 来开启事务, 但是事务结束后,马上又开启了新的事务,如果后面的语句不希望起事务,那么一定要在这次事务中提交 SET AUTOCOMMIT=1 ,使改动立即生效。
  我们用的 mysql 驱动是 go-sql-driver/mysql ,为了适配 Cobar,只能改代码了。
  connection.go
  -err := mc.exec("START TRANSACTION")
  +err := mc.exec("SET AUTOCOMMIT=0")
  transaction.go
  err = tx.mc.exec("COMMIT")
  +err = tx.mc.exec("SET AUTOCOMMIT=1")
  err = tx.mc.exec("ROLLBACK")
  +err = tx.mc.exec("SET AUTOCOMMIT=1")
  详细改动 见此
  改完是能用的。但是后来这个项目迁移到我们的 SOA 框架去了,那个框架的基础库也有一份 go-sql-driver/mysql (没改过的),而且因为框架的设计原因,这个库一定会被引入的
  _  "github.com/go-sql-driver/mysql"
  问题是,我还得用改过的那份代码呢,并且二者是不能同时引入的。这就蛋疼了。
  为何不能同时引入 github.com/go-sql-driver/mysql 和 github.com/yiyulantian/mysql (就是上面提到改动的那份fork) 呢? 其实它俩对于 database/sql 而言,是同一个数据库驱动。
  go-sql-driver/mysql 是这样注册进去的
  func init() {
  sql.Register("mysql", &MySQLDriver{})
  }
  而 sql.Register 做了名称的唯一性检查:
// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
// 看到没,重复了会 panic 的
if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name)
}
drivers[name] = driver
}
  对于这种 panic, 总不能为了绕过去而做一次 recover 吧。
  于是我决定曲线救国, 改个名字。
  func init() {
  - sql.Register("mysql", &MySQLDriver{})
  + sql.Register("mysql_cobar", &MySQLDriver{})
  }
  在使用上,创建一个 DB 对象就是这样啦:
  db, err = sql.Open("mysql_cobar", "YOUR_DSN")
  希望 DBA 快点废掉 cobar !
  Cobar 和 Mycat 对 BEGIN 和 START TrANSACTION 的代码实现
  先看下 Cobar(不贴代码了,自己点链接去看)
/*
 * Copyright 1999-2012 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.cobar.server.handler;

import com.alibaba.cobar.config.ErrorCode;
import com.alibaba.cobar.server.ServerConnection;

/**
 * @author xianmao.hexm
 */
public final class BeginHandler {

    public static void handle(String stmt, ServerConnection c) {
        c.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported statement");
    }

}
/*
 * Copyright 1999-2012 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.cobar.server.handler;

import com.alibaba.cobar.config.ErrorCode;
import com.alibaba.cobar.server.ServerConnection;
import com.alibaba.cobar.server.parser.ServerParse;
import com.alibaba.cobar.server.parser.ServerParseStart;

/**
 * @author xianmao.hexm
 */
public final class StartHandler {

    public static void handle(String stmt, ServerConnection c, int offset) {
        switch (ServerParseStart.parse(stmt, offset)) {
        case ServerParseStart.TRANSACTION:
            c.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unsupported statement");
            break;
        default:
            c.execute(stmt, ServerParse.START);
        }
    }

}
  再看下 Mycat
/*
 * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software;Designed and Developed mainly by many Chinese 
 * opensource volunteers. you can redistribute it and/or modify it under the 
 * terms of the GNU General Public License version 2 only, as published by the
 * Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 * 
 * Any questions about this component can be directed to it's project Web address 
 * https://code.google.com/p/opencloudb/.
 *
 */
package io.mycat.server.handler;

import io.mycat.server.ServerConnection;

/**
 * @author mycat
 */
public final class BeginHandler {
    private static final byte[] AC_OFF = new byte[] { 7, 0, 0, 1, 0, 0, 0, 0,
            0, 0, 0 };
    public static void handle(String stmt, ServerConnection c) {
        if (c.isAutocommit())
        {
            c.setAutocommit(false);
            c.write(c.writeToBuffer(AC_OFF, c.allocate()));
        }else
        {
            c.getSession2().commit() ;
        }
    }

}
/*
 * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software;Designed and Developed mainly by many Chinese 
 * opensource volunteers. you can redistribute it and/or modify it under the 
 * terms of the GNU General Public License version 2 only, as published by the
 * Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 * 
 * Any questions about this component can be directed to it's project Web address 
 * https://code.google.com/p/opencloudb/.
 *
 */
package io.mycat.server.handler;

import io.mycat.config.ErrorCode;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
import io.mycat.server.parser.ServerParseStart;

/**
 * @author mycat
 */
public final class StartHandler {
    private static final byte[] AC_OFF = new byte[] { 7, 0, 0, 1, 0, 0, 0, 0,
            0, 0, 0 };
    public static void handle(String stmt, ServerConnection c, int offset) {
        switch (ServerParseStart.parse(stmt, offset)) {
        case ServerParseStart.TRANSACTION:
            if (c.isAutocommit())
            {
                c.setAutocommit(false);
                c.write(c.writeToBuffer(AC_OFF, c.allocate()));
            }else
            {
                c.getSession2().commit() ;
            }
            break;
        default:
            c.execute(stmt, ServerParse.START);
        }
    }

}
  代码链接都是截止至 2017.01.19 最新的commit。
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号