一个小巧的MySQL Shell

发表于:2007-05-25来源:作者:点击数: 标签:MySQLshell小巧usrbin
#!/usr/bin/python # -*- coding: cp936 -*- ################################# # Written by caocao # # caocao@eastday.com # # http://nethermit.yeah.net # ################################# import sys, string, os from types import * import MySQ

#!/usr/bin/python
# -*- coding: cp936 -*-

#################################
#   Written by caocao           #
#   caocao@eastday.com          #
#   http://nethermit.yeah.net   #
#################################

import sys, string, os
from types import *
import MySQLdb

print "Written by caocao"
print "caocao@eastday.com"
print "http://nethermit.yeah.net"
print

def iif(expression, whenTrue, whenFalse):
 if expression:
  return whenTrue
 else:
  return whenFalse

class mysqlTest:
 def __init__(self, host="localhost", user="root", passwd="", db=""):
  self.connection=None
  self.host=host
  self.user=user
  self.passwd=passwd
  self.db=db
  self.result=None

  print "-"*40
  print "MySQL Shell v 1.0"
  print "Usage: python mysql.shell.py [host] [user] [passwd(% is empty)] [db]"
  print "Connect..."
  try:
   self.connection=MySQLdb.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db)
  except:
   print "Can't connect to mysql server.\nPlease make sure your username or password is right."
   sys.exit(1)
  print "-"*40
  print self.printComment("connection", "get_server_info")
  print self.printComment("connection", "get_host_info")
  print self.printComment("connection", "get_proto_info")
  print self.printComment("connection", "info")
  print self.printComment("connection", "character_set_name")
  print self.printComment("connection", "thread_id")
  print self.printComment("connection", "stat")

 def __del__(self):
  if self.connection!=None:
   self.connection.close()
  print "-"*40
  print "Quit..."

 def printComment(self, instance, function):
  return "%s = %s" % (string.rjust(function, 18), eval("self."+instance+"."+function+"()"))

 def printAll(self):
  output, row="", self.result.fetch_row(0)
  for i in range(self.result.num_fields()):
   output+=repr(self.result.describe()[i][0])+"\n"
  for i in range(self.result.num_rows()):
   for j in range(self.result.num_fields()):
    output+=iif(type(row[i][j]) is StringType, row[i][j], repr(row[i][j]))+"\n"
  return output

 def runSQL(self, queryString="show databases"):
  print "-"*40
  try:
   self.connection.query(queryString)
  except:
   print "Can't run sql."
  else:
   self.result=self.connection.store_result()
   print self.printComment("connection", "field_count")
   print self.printComment("connection", "affected_rows")
   print self.printComment("connection", "insert_id")
   print self.printComment("result", "num_fields")
   print self.printComment("result", "num_rows")
   print self.printComment("result", "field_flags")
   print "-"*40
   print self.printAll()

if __name__=="__main__":
 argArray=sys.argv
 del argArray[0]
 test=eval(("mysqlTest(\"%s\")" % "\",\"".join(argArray)).replace("%", ""))
 while True:
  try:
   command=string.strip(raw_input("PS mysql>"), " ")
   commandLow=string.lower(command)
  except EOFError:
   break
  else:
   if commandLow=="exit" or commandLow=="quit":
    break
   elif commandLow=="":
    continue
   else:
    test.runSQL(command)

原文转自:http://www.ltesting.net