使用 ndjson
什么是 ndjson ? (此段内容来自于网络)
ndjson 是一种 MIME 类型,用于表示 Newline Delimited JSON 格式的数据。
ndjson 是一种方便的格式,用于存储或流式传输结构化数据,每条记录可以逐行处理。
ndjson 的优势
- 流式处理:ndjson 使整个文件“流化”,将文件分割成许多份,避免了整体的束缚,支持局部处理,变得更灵活更快。
- 性能提升:在处理大数据集时,ndjson 格式可以显著提高性能,因为它允许逐行处理数据,而不需要一次性加载整个文件。
- 简化并行处理:由于每行都是一个独立的 JSON 对象,ndjson 格式的数据可以很容易地分割并行处理。
ndjson 和 json 的差异
传统的JSON数组
[
{"Name":"x1","Value":"aaa"}, {"Name":"x2","Value":"bbb"},
{"Name":"x3","Value":"ccc"}, {"Name":"x4","Value":"ddd"},
{"Name":"x5","Value":"eee"}
]
等效的 ndjson 格式
{"Name":"x1","Value":"aaa"}
{"Name":"x2","Value":"bbb"}
{"Name":"x3","Value":"ccc"}
{"Name":"x4","Value":"ddd"}
{"Name":"x5","Value":"eee"}
最直观的差别有2点:
- 没有一对中括号 []
- 一行一个json对象,用换行符分隔,不需要逗号
ndjson 的用途
由于没有一对 [] 的限制,并且一行一个json,它就非常容易能形成【数据流】,
再借助 .NET Stream API 的强大功能,就可以实现:只需要很小的缓冲区就能处理无限大的数据量
反之传统的 json 数组,它的限制在于:所有数据必须全部放在内存中,内存消耗较大。
ndjson 的典型使用场景
- 将数据库的查询结果输出成 ndjson,写入文件或者输出到网络流
- 服务端返回列表数据,以ndjson形式输出,客户端用流的方式逐行读取并处理
- 客户端发送列表数据,以ndjson形式上传,服务端用流的方式逐行读取并处理
下面通过几个示例来展示 ndjson 的使用方法,共分为2大类:
- 小数据量:可用于代替 json-array
- 无限数据量:ndjson 的优势体现
示例展示-1--SQL查询导出成ndjson并gzip压缩写入文件--无限数据量
using DbContext dbContext = DbContext.Create("connection_name");
await dbContext.OpenConnectionAsync();
dbContext.BeginTransaction(IsolationLevel.ReadUncommitted); // 建议导出数据允许脏读
CPQuery query = dbContext.CPQuery.Create("select * from xxxxx", args);
using( FileStream fileStream = File.OpenWrite("d:/big-query-gzip-data.ndjson") ) {
using( GZipStream gZipStream = new GZipStream(fileStream, CompressionMode.Compress, true) ) {
using( StreamWriter writer = new StreamWriter(gZipStream, EncodingUtils.UTF8NoBOM, 4096, true) ) {
int count = await query.ExportToNdJsonAsync(maxRows: -1, writer); // 执行SQL查询并导出数据
Console.WriteLine($"SQL查询执行完成,row-count: {count}");
}
}
}
示例展示-2--服务端以ndjson形式返回数据--小数据量
[HttpGet]
[Route("table/list")]
public NdjsonResult ClientList()
{
List<ClientChannel> list = ClientListUtils.GetList();
list = FilterList(list);
return new NdjsonResult(list); // 返回【小量级】数据
}
浏览器收到的响应头如下:
说明:NdjsonResult 在输出时默认会开启 gzip 压缩。
示例展示-3--服务端以ndjson形式返回数据--无限数据量
如果需要返回的数据量非常大,使用NdjsonResult就不合适了,
这里结合前面的示例来实现一个更完整的数据导出功能:
[HttpPost]
[Route("export/data/ndjson")]
public async Task DataExport()
{
using DbContext dbContext = DbContext.Create("connection_name");
await dbContext.OpenConnectionAsync();
dbContext.BeginTransaction(IsolationLevel.ReadUncommitted); // 建议导出数据允许脏读
CPQuery query = dbContext.CPQuery.Create("select * from xxxxx", args);
using( TempFileStream fileStream = new TempFileStream() ) { // 这里用临时文件来缓冲数据
using( GZipStream gZipStream = new GZipStream(fileStream, CompressionMode.Compress, true) ) {
using( StreamWriter writer = new StreamWriter(gZipStream, EncodingUtils.UTF8NoBOM, 4096, true) ) {
int count = await query.ExportToNdJsonAsync(maxRows: -1, writer); // 执行SQL查询并导出数据
dbContext.Dispose(); // ############## 提前关闭数据库连接
}
}
this.NHttpContext.Response.SetHeader(HttpHeaders.Response.ContentEncoding, "gzip");
// 直接将临时文件流复制到响应流
await this.NHttpContext.HttpReplyAsync(200, fileStream, ResponseContentType.Ndjson);
}
// 临时文件会在此处自动删除
}
示例展示-4--客户端接收ndjson数据并转成强类型对象--小数据量
[TestMethod]
public async Task Test2()
{
HttpOption httpOption = new HttpOption {
Url = "http://linuxtest:8208/v20/api/fides/data/table/list"
};
List<EndClientUserInfo> list = await httpOption.GetResultAsync<List<EndClientUserInfo>>();
Assert.IsTrue(list.Count > 0);
}
HTTP客户端自动识别了2个响应头
- Content-Type: application/x-ndjson
- Content-Encoding: gzip
示例展示-5--客户端接收ndjson数据并转成强类型对象--无限数据量
[TestMethod]
public async Task Test3()
{
HttpOption httpOption = new HttpOption {
Url = "http://linuxtest:8208/v20/api/fides/data/table/list"
};
HttpResult<Stream> httpResult = await httpOption.GetResultAsync<HttpResult<Stream>>();
int count = 0;
using NdJsonReader reader = NdJsonReader.Create(httpResult);
foreach(var item in reader.ReadLines<EndClientUserInfo>() ) {
count++;
}
Assert.IsTrue(count > 0);
}
示例展示-6-客户端以ndjson方式上传数据--小数据量
// 数据量不大,全部在内存中
List<Product3> list = Product3.CreateTestDataList(20);
HttpOption httpOption = new HttpOption {
Method = "POST",
Url = "http://www.fish-test.com/show-body.aspx",
Data = list,
Format = SerializeFormat.Ndjson // 注意这里
};
await httpOption.SendAsync();
示例展示-7-客户端以ndjson方式上传数据--无限数据量
将一个 流 对象中的数据发送到服务端,这里使用前面示例产生的临时文件
// 数据量比较大,用文件来缓存,这里使用 示例1 产生的文件
using FileStream fileStream = File.OpenRead("d:/big-query-gzip-data.ndjson");
HttpOption httpOption = new HttpOption {
Method = "POST",
Url = "http://www.fish-test.com/show-body.aspx",
Data = fileStream, // 这里直接使用流对象
Format = SerializeFormat.Ndjson,
Header = new {
Content_Encoding = "gzip" // 写文件时使用了gzip压缩,所以这里需要指定
},
Timeout = 60_000
};
await httpOption.SendAsync();
示例展示-8--服务端从HTTP请求流中读取ndjson--小数据量
[HttpPost]
[Route("import/data/ndjson")]
public async Task ImportData()
{
List<Product3> list = this.NHttpContext.Request.ReadBodyAsNdjonsToList<Product3>();
using DbContext dbContext = DbContext.Create("connection_name");
foreach( var item in list ) {
await dbContext.Entity.InsertAsync(item);
}
}
示例展示-9--服务端从HTTP请求流中读取ndjson--无限数据量
[HttpPost]
[Route("import/data/ndjson")]
public async Task ImportData()
{
using NdJsonReader reader = NdJsonReader.Create(this.NHttpContext.Request);
using DbContext dbContext = DbContext.Create("connection_name");
foreach( var item in reader.ReadLines<Product3>() ) {
await dbContext.Entity.InsertAsync(item);
}
}