标签:date tst hash 项目 object 选择 topic list() offset
.NET VS工具添加程序包源 在NuGet包管理中选择程序包源为上面添加的私有仓库。 搜索Data.Pipelines并安装。 在app.congif或者web.config中添加Kafka配置
appSettings>
add key="kafka.ip" value="172.20.105.205"/>
add key="kafka.prot" value="9092"/>
appSettings>
///
/// 实时数据综合推送
///
///
public void RealTimeDataPush(List listTemp, List listTempDetail)
{
var listData = new Liststring, object>>();
foreach (var item in listTemp)
{
var d = new Dictionarystring, object>();
d.Add("XH", item.XH);//序号
d.Add("DXMC", item.DXMC);//对象名称
d.Add("DXBH", item.DXBH);//对象编号
d.Add("CDBH", item.CDBH);//测点编号
d.Add("CDMC", item.CDMC);//测点名称
d.Add("XZQDM", item.XZQDM);//行政区代码
d.Add("SSXZQ", item.SSXZQ);//所属行政区
d.Add("JCSJ", item.JCSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//监测时间
//d.Add("JCN", item.JCN);//监测年
//d.Add("JCY", item.JCY);//监测月
//d.Add("JCR", item.JCR);//监测日
//d.Add("JCXS", item.JCXS);//监测小时
d.Add("SFZX", item.SFZX);//是否在线
if (!string.IsNullOrEmpty(item.ZXBZBH))
d.Add("ZXBZBH", item.ZXBZBH);//执行标准编号
else
d.Add("ZXBZBH", "");
if (!string.IsNullOrEmpty(item.ZXBZMC))
d.Add("ZXBZMC", item.ZXBZMC);//执行标准名称
else
d.Add("ZXBZMC", "");
if (!string.IsNullOrEmpty(item.SZLB))
d.Add("SZLB", item.SZLB);//水质类别
else
d.Add("SZLB", "");
if (!string.IsNullOrEmpty(item.SFCB))
d.Add("SFCB", item.SFCB);//是否超标
else
d.Add("SFCB", "");
if (!string.IsNullOrEmpty(item.CBXM))
d.Add("CBXM", item.CBXM);//超标项目
else
d.Add("CBXM", "");
//d.Add("YQBM", item.YQBM);//仪器编码
//d.Add("YQMC", item.YQMC);//仪器名称
//d.Add("YQXH", item.YQXH);//仪器型号
//d.Add("YQZT", item.YQZT);//仪器状态
d.Add("ORGID", item.ORGID);//机构代码
d.Add("CJR", item.CJR);//创建人
d.Add("CJSJ", item.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
d.Add("GXR", item.GXR);//更新人
d.Add("SJLY", item.SJLY);//数据来源
d.Add("SJZT", item.SJZT);//数据状态
d.Add("GXSJ", item.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间
#region 明细
var listData2 = new Liststring, object>>();
var listTempDetail2 = listTempDetail.Where(p => p.CYXH == item.XH);
foreach (var item2 in listTempDetail2)
{
var d2 = new Dictionarystring, object>();
d2.Add("XH", item2.XH);//序号
d2.Add("CYXH", item2.CYXH);//采样序号
if (!string.IsNullOrEmpty(item2.FXXMDM))
d2.Add("FXXMDM", item2.FXXMDM);//分析项目编号
else
d2.Add("FXXMDM", "");
if (!string.IsNullOrEmpty(item2.FXXMMC))
d2.Add("FXXMMC", item2.FXXMMC);//分析项目名称
else
d2.Add("FXXMMC", "");
d2.Add("BCJG", item2.BCJG);//报出结果
d2.Add("BCJGBS", item2.BCJGBS);//报出结果表示
d2.Add("BCJGDW", item2.BCJGDW);//报出结果单位
if (!string.IsNullOrEmpty(item2.BZSX))
d2.Add("BZSX", item2.BZSX);//标准上限
else
d2.Add("BZSX", "");
if (!string.IsNullOrEmpty(item2.BZXX))
d2.Add("BZXX", item2.BZXX);//标准下限
else
d2.Add("BZXX", "");
if (!string.IsNullOrEmpty(item2.SFCB))
d2.Add("SFCB", item2.SFCB);//是否超标
else
d2.Add("SFCB", "");
if (!string.IsNullOrEmpty(item2.CBBS))
d2.Add("CBBS", item2.CBBS);//超标倍数
else
d2.Add("CBBS", "");
if (!string.IsNullOrEmpty(item2.SZLB))
d2.Add("SZLB", item2.SZLB);//水质类别
else
d2.Add("SZLB", "");
if (!string.IsNullOrEmpty(item2.SZLB))
d2.Add("YQSBXH", item2.YQSBXH);//仪器编码
else
d2.Add("YQSBXH", "");
d2.Add("ORGID", item2.ORGID);//机构代码
d2.Add("CJR", item2.CJR);//创建人
d2.Add("CJSJ", item2.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
d2.Add("GXR", item2.GXR);//更新人
d2.Add("SJLY", item2.SJLY);//数据来源
d2.Add("SJZT", item2.SJZT);//数据状态
d2.Add("GXSJ", item2.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间
listData2.Add(d2);
}
#endregion
d.Add("JCJG", listData2);
listData.Add(d);
}
//SZZDZ_CYXX
var product = new DataProducer("dfb426e321fc417981858b0927c21016", "XH");
//分页推送
int PageSize = 100;
int PageTotal = (int)Math.Ceiling((decimal)listData.Count / PageSize);
for (int PageIndex = 0; PageIndex )
{
var data = listData.Skip(PageIndex * PageSize).Take(PageSize).ToList();
product.Send(data, r => Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}"));
}
Assert.IsTrue(true);
//Thread.Sleep(500);
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Confluent.Kafka;
//using Newtonsoft.Json;
using System.Configuration;
using Newtonsoft.Json;
namespace Common
{
class DataProducer
{
private readonly string _key;
private readonly string _dataKeyField;
// 从配置读取kafka配置
static string ip = ConfigurationManager.AppSettings["kafka.ip"];
static string port = ConfigurationManager.AppSettings["kafka.port"];
///
/// 构造方法
///
/// 业务数据队列key
/// 数据主键
public DataProducer(string key, string dataKeyField)
{
this._key = key;
this._dataKeyField = dataKeyField;
}
///
/// 发送数据
///
/// 数据集
/// 发送成功后回调,在发送成功前,不要杀死相关线程,否则可能导致数据丢失
public void Send(Liststring, object>> data, Actionstring, string>> callback)
{
foreach (var datum in data)
{
var keys = new Liststring>(datum.Count);
// 找出时候类型的字段
foreach (var entry in datum)
{
if (entry.Value is DateTime)
{
keys.Add(entry.Key);
}
}
// 把时间转成long,与Java的getTime()兼容
foreach (var key in keys)
{
var eeee = (DateTime)datum[key];
// TODO 日期转换
datum[key] = "日期字符串";
}
}
var topic = this._key;
var d = new Dictionarystring, object> { { "data", data }, { "keyField", this._dataKeyField } };
var jsonString = JsonConvert.SerializeObject(d, new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore });
// TODO 生成key
Send(topic, jsonString.GetHashCode().ToString(), jsonString, callback);
}
///
/// 发送消息
///
/// KafkaTopic
/// 消息key
/// 数据
/// 发送完成回调
private void Send(string topic, string key, string value, Actionstring, string>> callback)
{
if (string.IsNullOrEmpty(ip))
{
ip = "172.20.105.205";
}
if (string.IsNullOrEmpty(port))
{
port = "9092";
}
var conf = new ProducerConfig { BootstrapServers = $"{ip}:{port}" };
using (var p = new ProducerBuilderstring, string>(conf).Build())
{
// TODO 异常处理,添加日志
p.Produce(topic, new Messagestring, string> { Key = key, Value = value }, callback);
//Console.WriteLine($"topic: {topic}. key: k, value: " + value);
// TODO 异常处理,添加日志
// wait for up to 1 seconds for any inflight messages to be delivered.
p.Flush(TimeSpan.FromSeconds(1));
}
}
}
}
.net推送数据之Kafka
标签:date tst hash 项目 object 选择 topic list() offset
原文地址:https://www.cnblogs.com/elves/p/12274823.html