-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb_to_pbi.py
161 lines (110 loc) · 6.95 KB
/
db_to_pbi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
"""db_to_pbi.py
This module implements the following three functions:
* **derive_bim**: Generate and download a `Tabular Model <https://github.com/otykier/TabularEditor/wiki/Power-BI-Desktop-Integration>`__ ``.bim`` file based on `ResultSets of a Stored Procedure <https://github.com/DataBooster/DbWebApi/wiki#http-response>`__ - data (multiple tables);
* **deploy_dataset**: Create a pushable dataset (or update the metadata and schema for existing tables) in Power BI Service by a `Tabular Model <https://github.com/otykier/TabularEditor/wiki/Power-BI-Desktop-Integration>`__ ``.bim`` file;
* **push_data**: Push all `ResultSets of a Stored Procedure <https://github.com/DataBooster/DbWebApi/wiki#http-response>`__ - data for multiple tables into a Power BI Push Dataset;
This module was originally shipped as an example code from https://github.com/DataBooster/PyWebApi, licensed under the MIT license.
Anyone who obtains a copy of this code is welcome to modify it for any purpose, and holds all rights to the modified part only.
The above license notice and permission notice shall be included in all copies or substantial portions of the Software.
"""
from bottle import response
from os import path as os_path
from urllib.parse import urlparse
from collections.abc import Mapping
from json import loads as json_decode
from simple_rest_call import rest
from pbi_authorization import get_accesstoken
from powerbi_push_datasets import PushDatasetsMgmt, derive_bim_from_resultsets
def _invoke_sp(sp_url:str, sp_args:dict, sp_timeout:float) -> dict:
def check_dbwebapi(result:dict) -> bool:
if not isinstance(result, Mapping):
return False
if 'ResultSets' in result and 'OutputParameters' in result and 'ReturnValue' in result:
return True
else:
return False
result = rest(sp_url, sp_args, timeout=sp_timeout)
if result and not check_dbwebapi(result):
raise TypeError(f"{repr(sp_url)} is not a dbwebapi call")
result_sets = result['ResultSets']
count_sets = len(result_sets)
if count_sets < 2 or len(result_sets[0]) != count_sets - 1:
raise ValueError(f"the first result set must be used to indicate the corresponding Power BI table name and optional push sequence number for all subsequent result sets")
return result
def _get_table_name_seq_list(first_result_set:list, name_only:bool=False) -> list:
def cast_to_int(seq_num:float) -> int:
if seq_num is None:
return None
if isinstance(seq_num, int):
return seq_num
else:
return int(seq_num)
table_name_column = next(col_name for col_name, col_value in first_result_set[0].items() if isinstance(col_value, str))
if name_only:
seq_num_column = None
else:
seq_num_column = next((col_name for col_name, col_value in first_result_set[0].items() if isinstance(col_value, (int, float))), None)
if seq_num_column:
return [(row[table_name_column], cast_to_int(row.get(seq_num_column))) for row in first_result_set]
else:
return [row[table_name_column] for row in first_result_set]
def _extract_sp_name(sp_url:str) -> str:
sp_path = urlparse(sp_url).path
_, _, sp_name = sp_path.rpartition('.')
sp_name, _, _ = sp_name.partition('/')
return sp_name
def derive_bim(sp_url:str, sp_args:dict=None, dataset_name:str=None, timeout:float=1800) -> dict:
"""Generate and download a `Tabular Model <https://github.com/otykier/TabularEditor/wiki/Power-BI-Desktop-Integration>`__ ``.bim`` file based on `ResultSets of a Stored Procedure <https://github.com/DataBooster/DbWebApi/wiki#http-response>`__ - data (multiple tables).
The first result set of the stored procedure must be used to indicate the corresponding Power BI table name in Push Dataset for all subsequent result sets..
"""
if not dataset_name:
dataset_name = _extract_sp_name(sp_url)
result = _invoke_sp(sp_url, sp_args, timeout)
metadata = result['ResultSets'].pop(0)
table_names = _get_table_name_seq_list(metadata, True)
bim = derive_bim_from_resultsets(result['ResultSets'], table_names, dataset_name)
response.set_header('Content-Disposition', f'attachment; filename="{dataset_name}.bim"')
return bim
def deploy_dataset(model_bim:Mapping, dataset_name:str=None, workspace:str=None):
"""Create a pushable dataset (or update the metadata and schema for existing tables) in Power BI Service by a `Tabular Model <https://github.com/otykier/TabularEditor/wiki/Power-BI-Desktop-Integration>`__ ``.bim`` file.
"""
if isinstance(model_bim, str):
if 8 < len(model_bim) < 200 and model_bim[-4:].lower() == '.bim' and os_path.exists(model_bim):
with open(model_bim, 'r') as bim_file:
model_bim = bim_file.read()
model_bim = json_decode(model_bim)
if not isinstance(model_bim, Mapping):
raise ValueError(f"model_bim argument is not a valid JSON")
if not dataset_name:
dataset_name = model_bim.get("name")
if not dataset_name:
raise ValueError(f"dataset_name argument is missing")
access_token = get_accesstoken()
pd_mgmt = PushDatasetsMgmt(access_token)
return pd_mgmt.deploy_dataset(model_bim, dataset_name, workspace)
def push_data(sp_url:str, sp_args:dict=None, dataset_name:str=None, workspace:str=None, timeout:float=1800):
"""Push all `ResultSets of a Stored Procedure <https://github.com/DataBooster/DbWebApi/wiki#http-response>`__ - data for multiple tables into a Power BI Push Dataset.
The first result set of the stored procedure must be used to indicate the corresponding Power BI table name in Push Dataset for all subsequent result sets,
and (optional) the Sequence Number for the corresponding table if you need to enable the **X-PowerBI-PushData-SequenceNumber** feature -
a build-in mechanism to guarantee which rows have been successfully pushed.
"""
if not dataset_name:
dataset_name = _extract_sp_name(sp_url)
result = _invoke_sp(sp_url, sp_args, timeout)
metadata = result['ResultSets'].pop(0)
table_name_seq_list = _get_table_name_seq_list(metadata, False)
access_token = get_accesstoken()
pd_mgmt = PushDatasetsMgmt(access_token)
pd_mgmt.push_tables(result['ResultSets'], table_name_seq_list, dataset_name, workspace)
#table_names = [tbn for tbn, seq in table_name_seq_list]
#return pd_mgmt.get_sequence_numbers(table_names, dataset_name, workspace)
def get_seq_num(table_names:list, dataset_name:str, workspace:str=None):
access_token = get_accesstoken()
pd_mgmt = PushDatasetsMgmt(access_token)
return pd_mgmt.get_sequence_numbers(table_names, dataset_name, workspace)
def truncate(table_names:list, dataset_name:str, workspace:str=None):
access_token = get_accesstoken()
pd_mgmt = PushDatasetsMgmt(access_token)
pd_mgmt.truncate_tables(table_names, dataset_name, workspace)
__version__ = "0.1a0.dev1"