开发手册 - 连接器对象 API¶
不同的连接器对象具有不同的 API 及操作方式,使用时应以实际情况为准。
此外,DataWay 和 DataKit 由于迭代更新较快,其本身的接口也存在变化的情况。因此本文档始终以最新版为准。
提示:在较早版本中,曾名为「数据源」,现版本已改为「连接器」
1. DataKit¶
DataKit 连接器操作对象主要提供数据写入方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
source |
str | None |
指定 Source(注意不要填写"mysql" 等,防止与其他采集器冲突混淆) |
DataKit.write_by_category(...)¶
write_by_category(...)
方法用于向 DataKit 写入特定类型的数据,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
category |
str | 必须 | 数据类型,详见 DataKit API 文档,/v1/write/:category |
measurement |
str | 必须 | 指标集名称 |
tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
status_code, result = dk.write_by_category(category='metric', measurement='主机监控', tags={'host': 'web-01'}, fields={'cpu': 10})
DataKit.write_by_category_many(...)¶
write_by_category(...)
的批量版本,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
category |
str | 必须 | 数据类型,详见 DataKit API 文档,/v1/write/:category |
data |
list | 必须 | 数据点列表 |
data[#].measurement |
str | 必须 | 指标集名称 |
data[#].tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
data[#].fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
data[#].timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
data = [
{ 'measurement': '主机监控',
'tags': {'host': 'web-01'}, 'fields': {'value': 10} },
{ 'measurement': '主机监控',
'tags': {'host': 'web-02'}, 'fields': {'value': 20} },
]
status_code, result = dk.write_by_category_many(category='metric', data=data)
DataKit.write_metric(...)¶
write_metric(...)
方法用于向 DataKit 写入指标数据,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
measurement |
str | 必须 | 指标集名称 |
tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
status_code, result = dk.write_metric(measurement='主机监控', tags={'host': 'web-01'}, fields={'cpu': 10})
DataKit.write_metric_many(...)¶
write_metric(...)
的批量版本,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data |
list | 必须 | 数据点列表 |
data[#].measurement |
str | 必须 | 指标集名称 |
data[#].tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
data[#].fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
data[#].timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
data = [
{ 'measurement': '主机监控',
'tags': {'host': 'web-01'}, 'fields': {'value': 10} },
{ 'measurement': '主机监控',
'tags': {'host': 'web-02'}, 'fields': {'value': 20} },
]
status_code, result = dk.write_metric_many(data=data)
DataKit.write_logging(...) / DataKit.write_logging_many(...)¶
用于向 DataKit 写入日志数据,参数与DataKit.write_metric(...)
/ DataKit.write_metric_many(...)
相同
DataKit.query(...)¶
query(...)
方法用于通过 DataKit 执行 DQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
dql |
str | 必须 | DQL 语句 |
dict_output |
bool | False |
是否自动转换数据为dict 。 |
raw |
bool | False |
是否返回原始响应。开启后dict_output 参数无效。 |
all_series |
bool | False |
是否自动通过slimit 和soffset 翻页以获取全部时间线。 |
{DataKit 原生参数} |
- | - | 透传至queries[0].{DataKit 原生参数} |
本方法支持 DataKit API DQL 查询接口中的参数,详细文档见 DataKit API 文档 - POST /v1/query/raw。
示例如下:
查询并以 Dict 形式返回数据
import time
import json
@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
datakit = DFF.SRC('datakit')
# 使用 DataKit 原生参数`time_range`,限制最近 1 小时数据
time_range = [
int(time.time() - 3600) * 1000,
int(time.time()) * 1000,
]
status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', dict_output=True, time_range=time_range)
print(json.dumps(result))
输出示例:
{
"series": [
[
{
"time": 1622463105293,
"host": "iZbp152ke14timzud0du15Z",
"load": 2.18,
"create_time": 1622429576363,
"tags": {}
},
{
"time": 1622462905921,
"host": "ubuntu18-base",
"load": 0.08,
"create_time": 1622268259114,
"tags": {}
},
{
"time": 1622461264175,
"host": "shenrongMacBook.local",
"load": 2.395508,
"create_time": 1622427320834,
"tags": {}
}
]
]
}
示例如下:
查询并以 DataKit 原始返回值格式返回数据
import time
import json
@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
datakit = DFF.SRC('datakit')
# 添加 raw 参数,获取 DQL 查询原始值
time_range = [
int(time.time() - 3600) * 1000,
int(time.time()) * 1000,
]
status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', raw=True, time_range=time_range)
print(json.dumps(result, indent=2))
输出示例:
{
"content": [
{
"series": [
{
"name": "HOST",
"columns": [
"time",
"host",
"load",
"create_time"
],
"values": [
[
1622463165152,
"iZbp152ke14timzud0du15Z",
1.92,
1622429576363
],
[
1622462905921,
"ubuntu18-base",
0.08,
1622268259114
],
[
1622461264175,
"shenrongMacBook.local",
2.395508,
1622427320834
]
]
}
],
"cost": "1ms",
"total_hits": 3
}
]
}
DataKit.get(...)¶
get(...)
方法用于向 DataKit 发送一个 GET 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
path |
str | 必须 | 请求路径 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataKit API 文档
DataKit.post_line_protocol(...)¶
post_line_protocol(...)
方法用于向 DataKit 以行协议格式发送一个 POST 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
path |
str | 必须 | 请求路径 |
points |
list | 必须 | 数据点格式的数据列表 |
points[#].measurement |
str | 必须 | 指标集名称 |
points[#].tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
points[#].fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
points[#].timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataKit API 文档
注意:从1.6.8
开始,参数path
调整为第一个参数
DataKit.post_json(...)¶
post_json(...)
方法用于向 DataKit 以 JSON 格式发送一个 POST 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
path |
str | 必须 | 请求路径 |
json_obj |
dict/list | 必须 | 需要发送的 JSON 对象 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataKit API 文档
注意:从1.6.8
开始,参数path
调整为第一个参数
2. DataWay¶
DataWay 连接器操作对象主要提供数据写入方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
token |
str | None |
指定 Token |
rp |
str | None |
制定 rp |
DataWay.write_point(...) / write_metric(...)¶
write_point(...)
方法用于向 DataWay 写入数据点,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
measurement |
str | 必须 | 指标集名称 |
tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
status_code, result = dw.write_point(measurement='主机监控', tags={'host': 'web-01'}, fields={'cpu': 10})
DataWay.write_points(...) / write_metrics(...)¶
write_point(...)
的批量版本,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
points |
list | 必须 | 数据点列表 |
points[#].measurement |
str | 必须 | 指标集名称 |
points[#].tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
points[#].fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
points[#].timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
示例如下:
points = [
{ 'measurement': '主机监控',
'tags': {'host': 'web-01'}, 'fields': {'value': 10} },
{ 'measurement': '主机监控',
'tags': {'host': 'web-02'}, 'fields': {'value': 20} },
]
status_code, result = dw.write_points(points)
DataWay.get(...)¶
get(...)
方法用于向 DataWay 发送一个 GET 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
path |
str | 必须 | 请求路径 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataWay 官方文档
DataWay.post_line_protocol(...)¶
post_line_protocol(...)
方法用于向 DataWay 以行协议格式发送一个 POST 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
points |
list | 必须 | 数据点格式的数据列表 |
points[#].measurement |
str | 必须 | 指标集名称 |
points[#].tags |
dict | 必须 | 标签。键名和键值必须都为字符串 |
points[#].fields |
dict | 必须 | 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一 |
points[#].timestamp |
int/long/float | {当前时间} | 时间戳,支持秒/毫秒/微秒/纳秒。 |
path |
str | "/v1/write/metric" |
请求路径 默认写入指标,需要写入其他数据的请指定本参数 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
with_rp |
bool | False |
是否自动将配置的 rp 信息附在 query 中作为参数一起发送 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataWay 官方文档
DataWay.post_json(...)¶
post_json(...)
方法用于向 DataWay 以 JSON 格式发送一个 POST 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
json_obj |
dict/list | 必须 | 需要发送的 JSON 对象 |
path |
str | 必须 | 请求路径 |
query |
dict | None |
请求 URL 参数 |
headers |
dict | None |
请求 Header 参数 |
with_rp |
bool | False |
是否自动将配置的 rp 信息附在 query 中作为参数一起发送 |
本方法为通用处理方法,具体参数格式、内容等请参考 DataWay 官方文档
3. Sidecar¶
使用 Sidecar 连接器操作对象允许用户调用 Sidecar 执行 Shell 命令。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
有关 Sidecar 的完整使用文档,请参考「Sidecar 手册」
SidecarHelper.shell(...)¶
shell(...)
方法用于执行调用 Sidecar 执行 Shell 命令,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
cmd |
str | 必须 | 需要执行的 Shell 命令 如: "ls -l" |
wait |
bool | True |
是否等待执行完成 设置为 False 时,本函数会立刻返回,并且不会返回终端输出 |
workdir |
str | None |
Shell 命令执行的工作目录 如: "/home/dev" |
envs |
dict | None |
环境变量,键和值都为字符串 如: {"MY_NAME": "Tom"} |
callback_url |
str | None |
回调地址,命令执行后,将stdout 和stderr 使用 POST 方式发送至指定 URL一般和 wait=False 参数一起使用,实现异步回调 |
timeout |
int | 3 |
请求超时时间 注意:本参数并不是 Shell 命令的超时时间,而是 Func 请求 Sidecar 的超时时间 即 Func 请求 Sidecar 可能会超时,但所执行的 Shell 命令并不会因此停止 |
执行后回调¶
调用SidecarHelper.shell(...)
并指定callback_url
参数后,Sidecar 会在执行完 Shell 命令后将标准输出stdout
和标准错误stderr
以 POST 方式发送至此地址。
具体结构如下:
POST {callback_url}
Content-Type: application/json
{
"kwargs": {
"stdout": "<标准输出文本>",
"stderr": "<标准错误文本>"
}
}
此结构与 DataFlux Func 的「授权链接标准 POST 方式」匹配,可直接使用「授权链接」接收执行后的回调。
4. InfluxDB¶
InfluxDB 连接器操作对象为 Python 第三方包 influxdb(版本 5.2.3)的封装,主要提供一些用于查询 InfluxDB 的方法。 本连接器兼容以下数据库:
- 阿里云时序数据库 InfluxDB 版
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
InfluxDBHelper.query(...)¶
query(...)
方法用于执行 InfluxQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | InfluxQL 语句,可包含绑定参数占位符,形式为$var_name |
bind_params |
dict | None |
绑定参数 |
database |
str | None |
本次查询指定数据库 |
dict_output |
dict | False |
返回数据自动转换为{列名:值}形式 |
示例如下:
sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
db_res = db.query(sql, bind_params=bind_params, database='demo')
# {'series': [{'columns': ['time', 'city', 'hostname', 'status', 'value'], 'name': 'demo', 'values': [['2018-12-31T16:00:10Z', 'hangzhou', 'webserver', 'UNKNOW', 90], ['2018-12-31T16:00:20Z', 'hangzhou', 'jira', 'running', 40], ['2018-12-31T16:00:50Z', 'hangzhou', 'database', 'running', 50], ['2018-12-31T16:01:00Z', 'hangzhou', 'jira', 'stopped', 40], ['2018-12-31T16:02:00Z', 'hangzhou', 'rancher', 'UNKNOW', 90]]}]}
sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
db_res = db.query(sql, bind_params=bind_params, database='demo', dict_output=True)
# {'series': [[{'city': 'hangzhou', 'hostname': 'webserver', 'status': 'UNKNOW', 'time': '2018-12-31T16:00:10Z', 'value': 90}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'running', 'time': '2018-12-31T16:00:20Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'database', 'status': 'running', 'time': '2018-12-31T16:00:50Z', 'value': 50}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'stopped', 'time': '2018-12-31T16:01:00Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'rancher', 'status': 'UNKNOW', 'time': '2018-12-31T16:02:00Z', 'value': 90}]]}
InfluxDBHelper.query2(...)¶
query2(...)
方法同样用于执行 InfluxQL 语句,但参数占位符不同,使用问号?
作为参数占位符。参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | InfluxQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
InfluxQL 参数 |
database |
str | None |
本次查询指定数据库 |
dict_output |
dict | False |
返回数据自动转换为{"列名": "值"} 形式 |
示例如下:
sql = 'SELECT * FROM ?? WHERE city = ? LIMIT 5'
sql_params = ['demo', 'hangzhou']
db_res = db.query2(sql, sql_params=sql_params, dict_output=True)
InfluxDBHelper.write_point(...)¶
1.1.13
版本新增
write_point(...)
方法用于写入单个数据点。参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
measurement |
str | 必须 | 指标集 |
fields |
dict{str: str/int/float/bool} | 必须 | 字段 键名必须为 str 键值可以为 str/int/float/bool |
tags |
dict{str: str} | None |
标签 键名、键值必须为都为 str |
timestamp |
str/int | None |
时间 ISO 格式,如: 2020-01-01T01:02:03Z UNIX 时间戳如: 1577840523 |
database |
str | None |
本次写入指定数据库 |
示例如下:
fields = { 'cpu': 100, 'mem': 0.5 }
tags = { 'host': 'web001' }
db_res = db.write_point(measurement='host_monitor', fields=fields, tags=tags)
InfluxDBHelper.write_points(...)¶
1.1.13
版本新增
write_points(...)
方法用于批量写入数据点。参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
points |
list | 必须 | 数据点数组 |
points[#]['measurement'] |
str | 必须 | 指标集 |
points[#]['fields'] |
dict{str: str/int/float/bool} | 必须 | 字段 键名必须为 str 键值可以为 str/int/float/bool |
points[#]['tags'] |
dict{str: str} | None |
标签 键名、键值必须为都为 str |
points[#]['time'] |
str/int | None |
时间 ISO 格式,如: 2020-01-01T01:02:03Z UNIX 时间戳如: 1577840523 |
database |
str | None |
本次写入指定数据库 |
points = [
{
'measurement': 'host_monitor',
'fields' : { 'cpu': 100, 'mem': 0.5 },
'tags' : { 'host': 'web001' },
}
]
db_res = db.write_points(points)
5. MySQL¶
MySQL 连接器操作对象主要提供一些操作 MySQL 的方法。 本连接器以下数据库:
- MariaDB
- Percona Server for MySQL
- 阿里云 PolarDB MySQL
- 阿里云 OceanBase
- 阿里云分析型数据库 (ADB) MySQL 版
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
MySQLHelper.query(...)¶
query(...)
方法用于执行 SQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'SELECT * FROM ?? WHERE seq > ?'
sql_params = ['demo', 1]
db_res = db.query(sql, sql_params=sql_params)
MySQLHelper.non_query(...)¶
non_query(...)
方法用于执行增、删、改等 SQL 语句,返回影响行数。参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'DELETE FROM ?? WHERE id = ?'
sql_params = ['demo', 1]
effected_rows = db.non_query(sql, sql_params=sql_params)
6. Redis¶
Redis 连接器操作对象主要提供 Redis 的操作方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
RedisHelper.query(...)¶
query(...)
方法用于执行 Redis 命令,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
*args |
[str] | 必须 | Redis 命令及参数 |
示例如下:
注意:Redis 返回的值类型为bytes
。实际操作时,可以根据需要进行类型转换
db_res = db.query('GET', 'intValue')
print(int(db_res))
db_res = db.query('GET', 'strValue')
print(str(db_res))
db_res = db.query('GET', 'jsonValue')
print(json.loads(db_res))
7. Memcached¶
Memcached 连接器操作对象主要提供 Memcached 的操作方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
MemcachedHelper.query(...)¶
query(...)
方法用于执行 Memcached 命令,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
*args |
[str] | 必须 | Memcached 命令及参数 |
示例如下:
8. ClickHouse¶
ClickHouse 连接器操作对象主要提供一些数据方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
ClickHouseHelper.query(...)¶
query(...)
方法用于执行 SQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'SELECT * FROM ?? WHERE age > ?'
sql_params = ['demo_table', 50]
db_res = helper.query(sql, sql_params=sql_params)
9. Oracle Database¶
Oracle Database 连接器操作对象主要提供 Oracle Database 的操作方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
OracleDatabaseHelper.query(...)¶
query(...)
方法用于执行 SQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'SELECT * FROM ?? WHERE seq > ?'
sql_params = ['demo', 1]
db_res = db.query(sql, sql_params=sql_params)
10. Microsoft SQL Server¶
Microsoft SQL Server 连接器操作对象主要提供 Microsoft SQL Server 的操作方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
SQLServerHelper.query(...)¶
query(...)
方法用于执行 SQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'SELECT * FROM ?? WHERE seq > ?'
sql_params = ['demo', 1]
db_res = db.query(sql, sql_params=sql_params)
11. PostgreSQL¶
PostgreSQL 连接器操作对象主要提供一些操作 PostgreSQL 的方法。 本连接器以下数据库:
- Greenplum Database
- 阿里云 PolarDB MySQL
- 阿里云分析型数据库 (ADB) PostgreSQL 版
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
PostgreSQLHelper.query(...)¶
query(...)
方法用于执行 SQL 语句,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
sql |
str | 必须 | SQL 语句,可包含参数占位符。? 表示需要转义的参数;?? 表示不需要转义的参数 |
sql_params |
list | None |
SQL 参数 |
示例如下:
sql = 'SELECT * FROM ?? WHERE seq > ?'
sql_params = ['demo', 1]
db_res = db.query(sql, sql_params=sql_params)
12. mongoDB¶
mongoDB 连接器操作对象主要提供一些操作 mongoDB 的方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
database |
str | None |
指定数据库 |
MongoDBHelper.db(...)¶
db(...)
方法用于获取数据库操作对象,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
db_name |
str | None |
数据库名。未传递名称时,返回默认数据库操作对象。 |
示例如下:
# 获取默认数据库对象
db = helper.db()
# 获取指定数据库对象
db = helper.db('some_db')
# 获取集合对象
collection = db['some_collection']
# 查询处理
data = collection.find_one()
# 写成一行
data = helper.db('some_db')['some_collection'].find_one()
MongoDBHelper.run_method(...)¶
run_method() 方法用于获取数据库列表或集合列表,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
method |
str | 必须 | 执行方法,枚举:list_database_names :列出数据库list_collection_names :列出集合 |
db_name |
str | None |
执行list_collection_names 时可传递,指定数据库;不传递则为默认数据库 必须以命名参数方式传递 |
示例如下:
db_list = helper.run_method('list_database_names')
collection_list = helper.run_method('list_collection_names')
collection_list = helper.run_method('list_collection_names', db_name='some_db')
具体查询语法、格式等,请参考 mongoDB 官方文档
13. elasticsearch¶
elasticsearch 连接器操作对象主要提供一些操作 elasticsearch 的方法。
DFF.SRC(...)
参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
data_source_id |
str | 必须 | 连接器 ID |
ElasticSearchHelper.query(...)¶
query(...)
方法用于向 elasticsearch 发送 HTTP 请求,参数如下:
参数 | 类型 | 必须/默认值 | 说明 |
---|---|---|---|
method |
str | 必须 | 请求方法:GET ,POST 等 |
path |
str | None |
请求路径 |
query |
dict | None |
请求 URL 参数 |
body |
dict | None |
请求体 |
示例如下:
db_res = db.query('GET', '/some_index/_search?q=some_field:something')
db_res = db.query('GET', '/some_index/_search', query={...})
db_res = db.query('GET', '/some_index/_search', body={...})
具体查询语法、格式等,请参考 elasticsearch 官方文档