很多童鞋初学Python,学习了语法和基础类库后,开始迷茫如何实际使用到工作中去,其实Python可以做的事情是很多的,将日常工作的一些事情自动化,对我们的工作效率有很大的提升。
本文面向Py新手,分享一些辅助工作的小工具思路。以下例子都是在Win10 + Py3.5下完成。
调用CMD
subprocess是Python自带的子进程管理模块,定义有数个创建子进程的函数,也提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
简单理解就是,你通过CMD敲的命令,都基本可以用subprocess来实现批量处理。
例子1:批量SVN操作
以更新SVN为例,这是一个频繁的操作,尤其是多个SVN目录需要一一更新的时候,手动起来是挺麻烦的。
import subprocess
subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\策划文档" /closeonend:0')
subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\配置文档" /closeonend:0')
例子2:adb命令的封装
做安卓手游测试的时候,adb是常用工具,我们可以通过它,进行apk的安装,卸载,截图,获取APK信息,性能数据,获取手机信息等等操作。
比如获取当前运行在前台的apk的package和activity名称
def run_cmd(cmd): """执行CMD命令""" p = subprocess.Popen(cmd, stdout=subprocess.PIPE) return [i.decode() for i in p.communicate()[0].splitlines()] def get_apk_info(): """获取apk的package,activity名称 :return: list eg ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity'] """ result = run_cmd("adb shell dumpsys activity top") for line in result: if line.strip().startswith('ACTIVITY'): return line.split()[1].split('/') print(get_apk_info()) output: ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity'] |
比如查看当前apk的内存占用
def get_mem_using(package_name=None): """查看apk的内存占用 :param package_name: :return: 单位KB """ if not package_name: package_name = get_apk_info()[0] result = run_cmd("adb shell dumpsys meminfo {}".format(package_name)) info = re.search('TOTAL\W+\d+', str(result)).group() mem = '' try: mem = info.split() except Exception as e: print(info) print(e) return mem[-1] output: 37769 |
比如备份当前apk到桌面
def backup_current_apk(path=r"C:\Users\jianbing\Desktop\apks"): package = get_apk_info()[0] result = run_cmd("adb shell pm path {}".format(package)) cmd = "adb pull {} {}".format(result[0].split(":")[-1], os.path.join(path, "{}.apk".format(package))) print(cmd) run_cmd(cmd) |
再进一步,将常用的adb操作封装为一个ADB工具类。社区里也有童鞋之前分享过,传送门。
处理文本
例子:在整个文件夹中搜索关键字
某天策划说,这个版本他删掉了某个道具,让我检查下有没有删漏的地方,这个道具产出的地方不少,最佳的检查方式是各个相关配置表看下还有没有配置这个道具。
那就写个脚本遍历整个文件夹来搜索指定关键字吧。
import os def get_files_by_suffix(path, suffixes=("txt", "xml"), traverse=True): """从path路径下,找出全部指定后缀名的文件 :param path: 根目录 :param suffixes: 指定查找的文件后缀名 :param traverse: 如果为False,只遍历一层目录 :return: """ file_list = [] for root, dirs, files in os.walk(path): for file in files: file_suffix = os.path.splitext(file)[1][1:].lower() # 后缀名 if file_suffix in suffixes: file_list.append(os.path.join(root, file)) if not traverse: return file_list return file_list if __name__ == '__main__': keyword = "XXX宝箱" files = get_files_by_suffix(r"C:\project\config") for file in files: with open(file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read().lower() position = content.find(keyword.lower()) if position != -1: print("Find in {0}".format(file)) start = position - 100 if position - 100 > 0 else 0 end = position + 100 if position + 100 < len(content) else len(content) print(content[start:end]) print("_" * 100) |
操作远程服务器
例子1:查看内网发版时间
有时候问开发,最近一次内网服务端发版是什么时候?开发回答:有点忘记了。。那就得自力更生了~
手动方式:使用FTP软件连入内网服务器,查看文件的更新日期,从而知道发版时间。
懒人方式:Py大法好~
paramiko是Python很有名的第三方库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。
import paramiko import time _transport = paramiko.Transport("192.168.1.10:22") _transport.connect(username="root", password="XXXXXX") sftp = paramiko.SFTPClient.from_transport(_transport) result = sftp.listdir_attr("/data/www/sg/sg_dev/socket/conf/config/treasure") print("发版时间是:{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result[0].st_mtime)))) sftp.close() |
例子2:查看内网报错信息
在进行测试的时候,需要多留意服务端是否有新的报错信息,有些报错在客户端并没有什么表现,比如数据进库失败,手动方式:通过SecureCRT连入内网服务器,CD到Log目录下,然后tail -n 200 sg_error.log 查看最新的报错信息。
于是萌生了写一个小工具来定时检测,发现报错信息就保存起来的想法。
import datetime import paramiko import time import os class ScanError(object): def __init__(self): self._ssh = paramiko.SSHClient() self.last_error_log = None self._init() def _init(self): os.chdir("data") # 打算将报错信息保存到data目录下 self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self._ssh.connect("192.168.1.10", username="root", password="XXXXXX") error_log = self.get_error_log(500) self.last_error_log = error_log # 检测最近三天有没有报错信息 today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) the_day_before_yesterday = today - datetime.timedelta(days=2) error_log_str = "\n".join(error_log) if error_log_str.find(str(today)) > -1 or error_log_str.find(str(yesterday)) > -1 or error_log_str.find(str(the_day_before_yesterday)) > -1: self.save_error_log("error.txt", error_log) print('内网最近三天有错误信息,请查看') os.popen('error.txt') def get_error_log(self, num=200): cmd = 'cd /data/www/sg/sg_dev/socket/log&&tail -n {} sg_error.log'.format(num) stdin, stdout, stderr = self._ssh.exec_command(cmd) error_log = [i.decode("utf-8") for i in stdout.read().splitlines() if i] return error_log @staticmethod def save_error_log(file_name, log: list): with open(file_name, 'w', encoding='utf-8') as f: f.write("\n".join(log)) def run_forever(self, interval=30, show_error=True): """运行检测工具 :param interval: 检测间隔 :param show_error: 是否检测到报错就自动弹出显示 :return: """ while 1: time.sleep(interval) error_log = self.get_error_log() if error_log != self.last_error_log and "\n".join(set(error_log) - set(self.last_error_log)).find("ERROR") > -1: self.last_error_log = error_log file_name = time.strftime("%Y-%m-%d-%H-%M-%S.txt", time.localtime(time.time())) print('{} 检测到内网有新的错误信息'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))) self.save_error_log(file_name, error_log) if show_error: os.popen(file_name) if __name__ == '__main__': ScanError().run_forever() |
数据库操作
例子:找号功能
内网的服务器建了N多的号,有时候看着排行榜某个帐号,想登录看下数据,可以使用Python写一个连接数据库的找号脚本。
import cymysql player_name = "XXXXXX" conn = cymysql.connect(host='XXXXXX', user='sg', passwd='XXXXXX', db="dev", charset='utf8') cur = conn.cursor() sql = "select * from Player where name like '%{0}%'".format(player_name) # 模糊搜索,从玩家名称搜索玩家ID cur.execute(sql) for r in cur.fetchall(): sql = "select * from Account where uid = '{0}'".format(r[0]) # 从玩家ID搜索玩家帐号 cur.execute(sql) for row in cur.fetchall(): print('{0}, {1}, {2}'.format(r[0], r[1], row[2])) # 打印相关信息 conn.close() |
扩展开发提供的工具
在之前某个项目,开发做了一个给游戏帐号发道具的网页,提供测试使用,操作流程是这样的,在网页上的表单里边,填写玩家的ID,在下拉列表选中要发送的道具(支持模糊搜索),填写数量。
这个网页使用起来,工作效率不高的地方就是,每次添加道具,都需要重新选择道具和填写数量,且添加过程没有记录下来,无法复用。
优化方案,在网页上点击添加道具,其实就是网页给游戏服务器发送了一个HTTP请求,那就直接让Python来代劳吧~
import requests player_id = 1100000103 server_ip = "192.168.1.21:5000" data = {"stuffList": []} url = "http://{}/api/{}/stuff".format(server_ip, player_id) data["stuffList"].append({"itemID": 1104000007, "number": 1000}) # itemID为1104000007的物品,数量1000 data["stuffList"].append({"itemID": 1104000008, "number": 1000}) data["stuffList"].append({"itemID": 1104000009, "number": 1000}) data["stuffList"].append({"itemID": 1104000010, "number": 1000}) data["stuffList"].append({"itemID": 1104000011, "number": 1000}) data["stuffList"].append({"itemID": 1104000012, "number": 1000}) requests.post(url, json=data, timeout=5) # 添加道具 # requests.delete(url, json=data, timeout=5) # 删除道具 |
有没有发现,每个脚本都很简短~