''' author: utek page: http://utek.pl mail: mail_a_t_utek.pl Functions for opening and operations on Microsoft Access .mdb files ''' import win32com.client def openMDB(_filename, _user = '', _pass=''): '''Opens .mdb file with supplied user and password''' conn = win32com.client.Dispatch(r'ADODB.Connection') DSN = 'Provider=MSDASQL;DRIVER={Microsoft Access Driver (*.mdb)};DBQ=' + _filename + ';uid=' + _user + ';Pwd=' + _pass try: conn.Open(DSN) return conn except: if __name__ == "__main__": print "OpenMDB(): Error while opening file" return None # def closeMDB(_conn): '''Close opened connection to .mdb file''' try: _conn.Close() return 0 except: if __name__ == "__main__": print "CloseMDB(): Supplied argument is not proper connection type" return 1; # def getRS(self): '''Returns Record Set (ADODB.RecordSet)''' rs = win32com.client.Dispatch(r'ADODB.Recordset') return rs # def tableDef(_name, _conn): '''Returns list of dictionares describing sql_table {name, type, DefSize} ''' rs = win32com.client.Dispatch(r'ADODB.Recordset') rs.Open('[' + _name + ']', _conn, 1, 3) tabela = [] for x in range(rs.Fields.Count): tabela.append({"Name":rs.Fields.Item(x).Name, "Type":rs.Fields.Item(x).Type, "DefSize":rs.Fields.Item(x).DefinedSize}) return tabela # def iType2Str(_num): '''Convert ADO type(Integer) to Sql Type(String)''' num={20:"bigint",3:"int",2:"smallint",17:"tinyint",6:"money",135:"datetime",11:"bit",128:"timestamp",202:"varchar",130:"char",5:"float"} try: result = num[_num] return result except: result = "brak" return result # def getTables(_conn): '''Get tables list''' _cat = win32com.client.Dispatch(r'ADOX.Catalog') _cat.ActiveConnection = _conn tab = _cat.Tables list = [] for i in tab: if i.Type == 'TABLE': list.append(i.Name) elif i.Type == 'VIEW': list.append(i.Name) return list # def getData(_table, _conn): '''Returns all records in rows as a list of lists of dictionares ''' rs = win32com.client.Dispatch(r'ADODB.Recordset') rs.Open('[' + _table + ']', _conn, 1, 3) tabela = tableDef(_table, _conn) row = [] result = [] if rs.EOF: return result rs.MoveFirst() count = 0; while 1: if rs.EOF: break else: for i in range(len(tabela)): row.append({"Value":dataConvert(rs.Fields(tabela[i]["Name"]).Value,tabela[i]["Type"]),"Name":str(tabela[i]["Name"])}) result.append(row) row = [] rs.MoveNext() count = count + 1 return result # def insertIntoTable(_conn, _table, _data): '''inserts into provided table data in format [list][n-lists]{dictionary} [["row num"][{"Name":"Column name","Value":"Row Value},...]"''' rs = win32com.client.Dispatch(r'ADODB.Recordset') rs.Open('[' + _table + ']', _conn, 1, 3) table = tableDef(_table, _conn) count = 0 for i in range(len(_data)): rs.AddNew() for j in range(len(_data[i])): print _data[i][j]["Value"], _data[i][j]["Name"] print str(rs.Fields(_data[i][j]["Name"]).Value), str(_data[i][j]["Name"]) rs.Fields.Item(_data[i][j]["Name"]).Value = _data[i][j]["Value"] rs.Update() return 0 # def dataConvert(_value): '''Converts currency format from (m,n) to n/1000 to solve python currency problem''' _value = _value[1]/1000 return _value # def createAlter(_table, _lista): sql = "ALTER TABLE " + str(_table) + " ADD " sql_tmp = "" for i in range(len(_lista)): ''' if _lista[i]["Type"] == 202: typ = iTypeToStr(_lista[i]["Type"]) + "(" + _lista[i]["DefSize"] + ")" elif _lista[i]["Type"] == 202: typ = iTypeToStr(_lista[i]["Type"]) + "(" + _lista[i]["DefSize"] + ")" ''' typ = iType2Str(_lista[i]["Type"]) + "(" + str(_lista[i]["DefSize"]) + ")" sql_tmp = sql_tmp + _lista[i]['Name'] + " " + typ + ", " sql = sql + sql_tmp return sql[:-2] + ";" # # if __name__ == "__main__": conn = openMDB("pusta.mdb") conn2 = openMDB("tachospeed.mdb") lista = getTables(conn) lista_out = getTables(conn2) tabela=[] tabela_out = [] for i in range(len(lista)): tabela.append(tableDef(lista[i],conn)) for i in range(len(lista_out)): tabela_out.append(tableDef(lista_out[i],conn2)) count = 0 for i in range(len(lista_out)): try: temp = getData(lista_out[i],conn) if(len(temp)==0): print count print temp[0:2] print len(temp) count = count + 1 except: print "pusta" closeMDB(conn)