ADO.net DataTable 和Amazon SimpleDB的相互转换
Amazon SimpleDB是灵活的,高可用性的基于云的数据库。他和我们常见的SQL Server,Oracle等等不同,是非关系型的,也就是每一行不一定有同样的列数。另外还有个特点就是他把所有的值都按照字符串来保存,并且在查询时也是按照字典来排序的,所以在保存数字,时间和负数时,好先处理一下,AWS SDK中提供了对应的Encode和decode的工具。这篇文章介绍了如何把一个DataTable上传到SimpleDB,并且把SimpleDB下载到一个DataTable中。由于SimpleDB只按字符串保存,但为了再还原为DataTable时还能保存原先的数据类型,我的方法是把数据类型放在attribute的Name里,数据类型和数据字段名用冒号分开,比如 System.Int32:ID。 下面就是相互转换的代码。
Amazon SimpleDB is a highly available and flexible cloud based database. It is different with common used database like SQL Server or MySQL, it is a non-relational data store, the data can be organized into domains and can run queries across all of the data stored in a particular domain. Domains are collections of items that are described by attribute-value pairs. But we already have many data in relational database, how to move the data up to cloud? I created a simple tool, to upload DataTable of ADO.net to Amazon SimpleDB, and vise versa, that means to retrieve data from SimpleDB and save into a DataTable.
Since Amazon SimpleDB is a schema-less data store and everything is stored as a UTF-8 string value, so I need a way to maintain the schema(data type) of DataTable so that it can be used when creating DataTable from SimpleDB. My way is to save the data type information into the attribute name, with a colon(:) as separator between data type and field name.
This is a test DataTable:
Here is a screen-shot when I querying the corresponding SimpleDB with AWS Explorer of Visual Studio, which is part of AWS SDK for .net.
For the common SimpleDB domain like below:
I need to convert it to a DataTable as below, since there is no data type information in this domain, I will leave the data type as string in DataTable:
In SimpleDB, everything is stored as a UTF-8 string value, all comparisons are performed lexicographically. As a result, we need to use negative number offsets, zero padding, and store dates in an appropriate format.
Here is the code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using Amazon;
using Amazon.SimpleDB;
using Amazon.SimpleDB.Model;
using Amazon.SimpleDB.Util;
namespace DataTable2SimpleDb
{
/*
* Sample useage:
*
* DataTableSimpleDbConverter dtSdbConverter = new DataTableSimpleDbConverter();
*
* //DataTable dt = CreateTestDataTable();
* //Upload DataTable to SimpleDB
* dtSdbConverter.Datatable2SimpleDb(dt, "cloudTable");
*
* //CreateTestSimpleDB();
* //Retrive data from SimpleDB, save into DataTable
* DataTable dt2 = dtSdbConverter.SimpleDb2Datatable("MyStore");
*
*/
public class DataTableSimpleDbConverter
{
private const int MAX_NUM_DIGITS = 10;
private const int OFFSET_VALUE = int.MaxValue;
private const string EMPTY_VALUE_PLACEHOLDER = "";
private static AmazonSimpleDB sdb;
public DataTableSimpleDbConverter()
{
if (sdb == null)
{
sdb = AWSClientFactory.CreateAmazonSimpleDBClient(
"<your AWSAccessKey>",
"<your AWSSecretKey>");
}
}
public void Datatable2SimpleDb(DataTable dt, string domainName = "")
{
// if domainName is omitted, use the datatable name as domain name
if (string.IsNullOrEmpty(domainName))
{
domainName = dt.TableName;
}
CreateDomian(domainName);
int rowCount = 0;
foreach (DataRow row in dt.Rows)
{
string itemName = string.Format("Item{0}", rowCount++);
PutAttributesRequest putAttrReq = new PutAttributesRequest()
.WithDomainName(domainName).WithItemName(itemName);
foreach (DataColumn col in dt.Columns)
{
string fieldName = col.ColumnName;
//add data type, example: System.Int32:ID
fieldName = string.Format("{0}:{1}",col.DataType.ToString(), fieldName);
object value = row[col];
//value is save as string in SimpleDB,
//encoding/padding needed for number, datatime
string stringValue = string.Empty;
if (col.DataType == System.Type.GetType("System.Int32")
|| col.DataType == System.Type.GetType("System.Int16")
|| col.DataType == System.Type.GetType("System.Int64"))
{
stringValue = AmazonSimpleDBUtil.EncodeZeroPadding(
Convert.ToInt32(value),
MAX_NUM_DIGITS);
}
else if (col.DataType == System.Type.GetType("System.Decimal")
|| col.DataType == System.Type.GetType("System.Double")
|| col.DataType == System.Type.GetType("System.Single"))
{
stringValue = AmazonSimpleDBUtil.EncodeRealNumberRange(
Convert.ToSingle(value),
MAX_NUM_DIGITS,
MAX_NUM_DIGITS,
OFFSET_VALUE);
}
else if (col.DataType == System.Type.GetType("System.DateTime"))
{
stringValue = AmazonSimpleDBUtil.EncodeDate((DateTime)value);
}
else
{
stringValue = value.ToString();
}
List<ReplaceableAttribute> attributes = putAttrReq.Attribute;
attributes.Add(new ReplaceableAttribute()
.WithName(fieldName).WithValue(stringValue));
}
sdb.PutAttributes(putAttrReq);
}
}
public DataTable SimpleDb2Datatable(string domainName)
{
if (!DomainExist(domainName))
{
return null;
}
string selectExp = string.Format("select * from {0}", domainName);
SelectRequest selectReq = new SelectRequest()
.WithSelectExpression(selectExp);
SelectResponse selectResp = sdb.Select(selectReq);
if (!selectResp.IsSetSelectResult())
{
return null; // no data
}
DataTable dt = new System.Data.DataTable(domainName);
SelectResult selectResult = selectResp.SelectResult;
List<string> collomnNames = new List<string>();
//build the schema of DataTable
foreach (Item item in selectResult.Item)
{
//not useful for datatable
string itemName = item.IsSetName() ? item.Name : EMPTY_VALUE_PLACEHOLDER;
//Get all the attribute name as collomn of data table
foreach (Amazon.SimpleDB.Model.Attribute attr in item.Attribute)
{
if (!collomnNames.Contains(attr.Name))
{
collomnNames.Add(attr.Name);
string dataType = GetDataType(attr.Name);
string colName = GetCollomnName(attr.Name);
DataColumn col = new DataColumn(colName, Type.GetType(dataType));
dt.Columns.Add(col);
}
}
}
//Fill record into DataTable
foreach(Item item in selectResult.Item)
{
//fill data row
DataRow row = dt.NewRow();
foreach (string colNameWithType in collomnNames)
{
string value = string.Empty;
List<Amazon.SimpleDB.Model.Attribute> attrs =
item.Attribute.FindAll(att => att.Name == colNameWithType);
foreach (Amazon.SimpleDB.Model.Attribute attr in attrs)
{
value += (attr != null) ? attr.Value : EMPTY_VALUE_PLACEHOLDER;
value += ",";
}
//remove the last ","
if (value.Length > 1)
{
value = value.Substring(0, value.Length - 1);
}
//decode value
string dataType = GetDataType(colNameWithType);
string colName = GetCollomnName(colNameWithType);
switch (dataType)
{
case "System.Int":
case "System.Int32":
case "System.Int64":
int intVal = AmazonSimpleDBUtil.DecodeZeroPaddingInt(value);
row[colName] = intVal;
break;
case "System.Decimal":
case "System.Double":
case "System.Single":
float fValue = AmazonSimpleDBUtil.DecodeRealNumberRangeFloat(
value,
MAX_NUM_DIGITS,
OFFSET_VALUE);
row[colName] = fValue;
break;
case "System.DateTime":
DateTime dtValue = AmazonSimpleDBUtil.DecodeDate(value);
row[colName] = dtValue;
break;
default:
row[colName] = value;
break;
}
}
dt.Rows.Add(row);
}
return dt;
}
private static string GetCollomnName(string colNameWithType)
{
//no data type in collumn name
if (!colNameWithType.Contains(":")) return colNameWithType;
try
{
//valid data type in collumn name
string type = colNameWithType.Split(':')[0];
Type.GetType(type);
return colNameWithType.Split(':')[1];
}
catch
{
//invlide data type, perhapes this simpleDB domain
//is not uploaded from DataTable
return colNameWithType;
}
}
private static string GetDataType(string colNameWithType)
{
string defaultDataType = "System.String";
if (!colNameWithType.Contains(":")) return defaultDataType;
try
{
string type = colNameWithType.Split(':')[0];
Type.GetType(type);
return type;
}
catch
{
//invlide data type, perhapes this simpleDB domain is not uploaded from DataTable
return defaultDataType;
}
}
private void CreateDomian(string domainName)
{
CreateDomainRequest createDomainRequest = new CreateDomainRequest()
.WithDomainName(domainName);
if (!DomainExist(domainName))
{
sdb.CreateDomain(createDomainRequest);
}
}
private bool DomainExist(string domainName)
{
ListDomainsRequest listDomainRequest = new ListDomainsRequest();
ListDomainsResponse sdblistDomainResponse = sdb.ListDomains(listDomainRequest);
if (sdblistDomainResponse.IsSetListDomainsResult())
{
ListDomainsResult listDomainResult = sdblistDomainResponse.ListDomainsResult;
foreach (string domain in listDomainResult.DomainName)
{
if (domain == domainName)
{
return true;
}
}
}
return false;
}
}
}
相关文章