使用 ndjson

什么是 ndjson ? (此段内容来自于网络)

ndjson 是一种 MIME 类型,用于表示 Newline Delimited JSON 格式的数据。

ndjson 是一种方便的格式,用于存储或流式传输结构化数据,每条记录可以逐行处理。

ndjson 的优势

  • 流式处理:ndjson 使整个文件“流化”,将文件分割成许多份,避免了整体的束缚,支持局部处理,变得更灵活更快。
  • 性能提升:在处理大数据集时,ndjson 格式可以显著提高性能,因为它允许逐行处理数据,而不需要一次性加载整个文件。
  • 简化并行处理:由于每行都是一个独立的 JSON 对象,ndjson 格式的数据可以很容易地分割并行处理。




ndjson 和 json 的差异

传统的JSON数组

[
  {"Name":"x1","Value":"aaa"}, {"Name":"x2","Value":"bbb"},
  {"Name":"x3","Value":"ccc"}, {"Name":"x4","Value":"ddd"},
  {"Name":"x5","Value":"eee"}
]

等效的 ndjson 格式

{"Name":"x1","Value":"aaa"}
{"Name":"x2","Value":"bbb"}
{"Name":"x3","Value":"ccc"}
{"Name":"x4","Value":"ddd"}
{"Name":"x5","Value":"eee"}

最直观的差别有2点:

  • 没有一对中括号 []
  • 一行一个json对象,用换行符分隔,不需要逗号

ndjson 的用途

由于没有一对 [] 的限制,并且一行一个json,它就非常容易能形成【数据流】,

再借助 .NET Stream API 的强大功能,就可以实现:只需要很小的缓冲区就能处理无限大的数据量

反之传统的 json 数组,它的限制在于:所有数据必须全部放在内存中,内存消耗较大。


ndjson 的典型使用场景

  1. 将数据库的查询结果输出成 ndjson,写入文件或者输出到网络流
  2. 服务端返回列表数据,以ndjson形式输出,客户端用流的方式逐行读取并处理
  3. 客户端发送列表数据,以ndjson形式上传,服务端用流的方式逐行读取并处理

下面通过几个示例来展示 ndjson 的使用方法,共分为2大类:

  • 小数据量:可用于代替 json-array
  • 无限数据量:ndjson 的优势体现



示例展示-1--SQL查询导出成ndjson并gzip压缩写入文件--无限数据量

using DbContext dbContext = DbContext.Create("connection_name");
await dbContext.OpenConnectionAsync();
dbContext.BeginTransaction(IsolationLevel.ReadUncommitted);  // 建议导出数据允许脏读

CPQuery query = dbContext.CPQuery.Create("select * from xxxxx", args);

using( FileStream fileStream = File.OpenWrite("d:/big-query-gzip-data.ndjson") ) {
	using( GZipStream gZipStream = new GZipStream(fileStream, CompressionMode.Compress, true) ) {
		using( StreamWriter writer = new StreamWriter(gZipStream, EncodingUtils.UTF8NoBOM, 4096, true) ) {

		int count = await query.ExportToNdJsonAsync(maxRows: -1, writer);   // 执行SQL查询并导出数据
			Console.WriteLine($"SQL查询执行完成,row-count: {count}");
		}
	}
}



示例展示-2--服务端以ndjson形式返回数据--小数据量

[HttpGet]
[Route("table/list")]
public NdjsonResult ClientList()
{
	List<ClientChannel> list = ClientListUtils.GetList();
	list = FilterList(list);

	return new NdjsonResult(list);    // 返回【小量级】数据
}

浏览器收到的响应头如下:

xx

说明:NdjsonResult 在输出时默认会开启 gzip 压缩。



示例展示-3--服务端以ndjson形式返回数据--无限数据量

如果需要返回的数据量非常大,使用NdjsonResult就不合适了,

这里结合前面的示例来实现一个更完整的数据导出功能:

[HttpPost]
[Route("export/data/ndjson")]
public async Task DataExport()
{
	using DbContext dbContext = DbContext.Create("connection_name");
	await dbContext.OpenConnectionAsync();
	dbContext.BeginTransaction(IsolationLevel.ReadUncommitted);  // 建议导出数据允许脏读

	CPQuery query = dbContext.CPQuery.Create("select * from xxxxx", args);

	using( TempFileStream fileStream = new TempFileStream() ) {  // 这里用临时文件来缓冲数据

		using( GZipStream gZipStream = new GZipStream(fileStream, CompressionMode.Compress, true) ) {
			using( StreamWriter writer = new StreamWriter(gZipStream, EncodingUtils.UTF8NoBOM, 4096, true) ) {

				int count = await query.ExportToNdJsonAsync(maxRows: -1, writer);   // 执行SQL查询并导出数据

				dbContext.Dispose();   // ############## 提前关闭数据库连接
			}
		}

		this.NHttpContext.Response.SetHeader(HttpHeaders.Response.ContentEncoding, "gzip");

		// 直接将临时文件流复制到响应流
		await this.NHttpContext.HttpReplyAsync(200, fileStream, ResponseContentType.Ndjson);
	}
	// 临时文件会在此处自动删除
}



示例展示-4--客户端接收ndjson数据并转成强类型对象--小数据量

[TestMethod]
public async Task Test2()
{
	HttpOption httpOption = new HttpOption {
		Url = "http://linuxtest:8208/v20/api/fides/data/table/list"
	};

	List<EndClientUserInfo> list = await httpOption.GetResultAsync<List<EndClientUserInfo>>();

	Assert.IsTrue(list.Count > 0);
}

HTTP客户端自动识别了2个响应头

  • Content-Type: application/x-ndjson
  • Content-Encoding: gzip



示例展示-5--客户端接收ndjson数据并转成强类型对象--无限数据量

[TestMethod]
public async Task Test3()
{
	HttpOption httpOption = new HttpOption {
		Url = "http://linuxtest:8208/v20/api/fides/data/table/list"
	};

	HttpResult<Stream> httpResult = await httpOption.GetResultAsync<HttpResult<Stream>>();

	int count = 0;
	using NdJsonReader reader = NdJsonReader.Create(httpResult);

	foreach(var item in reader.ReadLines<EndClientUserInfo>() ) {
		count++;
	}

	Assert.IsTrue(count > 0);
}



示例展示-6-客户端以ndjson方式上传数据--小数据量

// 数据量不大,全部在内存中
List<Product3> list = Product3.CreateTestDataList(20);

HttpOption httpOption = new HttpOption {
	Method = "POST",
	Url = "http://www.fish-test.com/show-body.aspx",
	Data = list,
	Format = SerializeFormat.Ndjson     // 注意这里
};

await httpOption.SendAsync();



示例展示-7-客户端以ndjson方式上传数据--无限数据量

将一个 流 对象中的数据发送到服务端,这里使用前面示例产生的临时文件

// 数据量比较大,用文件来缓存,这里使用 示例1 产生的文件
using FileStream fileStream = File.OpenRead("d:/big-query-gzip-data.ndjson");

HttpOption httpOption = new HttpOption {
	Method = "POST",
	Url = "http://www.fish-test.com/show-body.aspx",
	Data = fileStream,      // 这里直接使用流对象
	Format = SerializeFormat.Ndjson,
	Header = new {
		Content_Encoding = "gzip"  // 写文件时使用了gzip压缩,所以这里需要指定
	},
	Timeout = 60_000
};
await httpOption.SendAsync();



示例展示-8--服务端从HTTP请求流中读取ndjson--小数据量

[HttpPost]
[Route("import/data/ndjson")]
public async Task ImportData()
{
	List<Product3> list = this.NHttpContext.Request.ReadBodyAsNdjonsToList<Product3>();

    using DbContext dbContext = DbContext.Create("connection_name");

	foreach( var item in list ) {
		await dbContext.Entity.InsertAsync(item);
	}
}



示例展示-9--服务端从HTTP请求流中读取ndjson--无限数据量

[HttpPost]
[Route("import/data/ndjson")]
public async Task ImportData()
{
	using NdJsonReader reader = NdJsonReader.Create(this.NHttpContext.Request);

	using DbContext dbContext = DbContext.Create("connection_name");

	foreach( var item in reader.ReadLines<Product3>() ) {
		await dbContext.Entity.InsertAsync(item);
	}
}