-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_year.yml
More file actions
190 lines (169 loc) · 10.8 KB
/
load_year.yml
File metadata and controls
190 lines (169 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
id: load_year
namespace: dezc
description: |
Sub-workflow to process a single year of data.
Data comes from the following API endpoint:
https://analisi.transparenciacatalunya.cat/Seguretat/V-ctimes-o-persones-autores-de-delictes-d-odi-i-di/gci6-2ubm
variables:
year : "{{trigger.date ?? execution.startDate | date('yyyy')}}"
table_basename : "crims"
file_basename : "{{vars.table_basename}}-{{render(vars.year)}}"
api_file : "{{vars.file_basename}}.csv"
gcs_file : "gs://{{envs.gcp_bucket_name}}/{{vars.api_file}}"
external_table_name: "{{envs.gcp_project_id}}.{{envs.gcp_dataset_name}}.{{vars.file_basename}}_ext"
enriched_table_name: "{{envs.gcp_project_id}}.{{envs.gcp_dataset_name}}.{{vars.file_basename}}_enriched"
table_name : "{{envs.gcp_project_id}}.{{envs.gcp_dataset_name}}.{{vars.table_basename}}"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
labels:
year: "{{render(vars.year)}}"
- id: parallel_main
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: data_load_sequence
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: api_query
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install sodapy pandas
outputFiles:
- "{{render(vars.api_file)}}"
script: |
import logging
import pandas as pd
from sodapy import Socrata
logging.getLogger().setLevel(logging.ERROR)
year = {{render(vars.year)}}
with Socrata("analisi.transparenciacatalunya.cat", None) as client:
# All results, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
results = client.get_all("gci6-2ubm", where=f"any == {year}", limit=2000)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(results)
results_df.to_csv("{{render(vars.api_file)}}", header=True, index=False)
- id: upload_to_gcs
type: io.kestra.plugin.gcp.gcs.Upload
from: "{{outputs.api_query.outputFiles[render(vars.api_file)]}}"
to: "{{render(vars.gcs_file)}}"
- id: bq_create_external_table
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE EXTERNAL TABLE `{{render(vars.external_table_name)}}`
(
any_ NUMERIC OPTIONS (description = 'Número de l\'any de les dades'),
num_mes NUMERIC OPTIONS (description = 'Número del mes de les dades'),
nom_mes STRING OPTIONS (description = 'Nom del mes de les dades'),
regio_policial_rp STRING OPTIONS (description = 'Nom de la Regió policial on s\'han produït els fets'),
area_basica_policial_abp STRING OPTIONS (description = 'Nom de l\'Area Basica Policial'),
provincia STRING OPTIONS (description = 'Província on s\'han produït els fets'),
comarca STRING OPTIONS (description = 'Comarca on s\'han produït els fets'),
municipi STRING OPTIONS (description = 'Municipi on s\'han produït els fets.'),
tipus_de_lloc_dels_fets STRING OPTIONS (description = 'Tipus del lloc on s\'han produït els fets (punt de la via, tipus d\'establiment, tipus de local, etc.)'),
tipus_de_fet STRING OPTIONS (description = 'Àmbit penal (delictes) o infraccions administratives'),
tipus_de_fet_codi_penal_o STRING OPTIONS (description = 'Detall del tipus de fet segons el codi penal (2n nivell) ) o les lleis relacionades amb l\'odi i la discriminació'),
ambit_procediment_fet STRING OPTIONS (description = 'Motivació que origina el fet o classificació del fet d\'odi i discriminació'),
rol_victima_o_autoria STRING OPTIONS (description = 'Víctima o persona autora dels fets relacionats amb odi i discriminació'),
sexe STRING OPTIONS (description = 'Home o dona.'),
edat_inici_fets NUMERIC OPTIONS (description = 'Edat de la víctima o de la persona autora quan es van iniciar els fets'),
nombre NUMERIC OPTIONS (description = 'Nombre de víctimes de cadascun dels fets relacionats amb l\'odi i la discriminació')
)
OPTIONS (
format = 'CSV',
uris = ['{{render(vars.gcs_file)}}'],
skip_leading_rows = 1,
ignore_unknown_values = TRUE
);
- id: bq_enrich_external_table
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE TABLE `{{render(vars.enriched_table_name)}}`
AS
SELECT
MD5(
CONCAT(
COALESCE(CAST(any_ AS STRING), ""),
COALESCE(CAST(num_mes AS STRING), ""),
COALESCE(CAST(municipi AS STRING), ""),
COALESCE(CAST(tipus_de_lloc_dels_fets AS STRING), ""),
COALESCE(CAST(tipus_de_fet AS STRING), ""),
COALESCE(CAST(tipus_de_fet_codi_penal_o AS STRING), ""),
COALESCE(CAST(ambit_procediment_fet AS STRING), ""),
COALESCE(CAST(rol_victima_o_autoria AS STRING), ""),
COALESCE(CAST(sexe AS STRING), ""),
COALESCE(CAST(edat_inici_fets AS STRING), ""),
COALESCE(CAST(nombre AS STRING), "")
)
) AS unique_row_id,
CAST(
CONCAT(
COALESCE(CAST(any_ AS STRING), ""),
"-",
COALESCE(CAST(num_mes AS STRING), ""),
"-01"
) AS DATE
) AS data_particio,
*
FROM `{{render(vars.external_table_name)}}`;
- id: bq_create_table
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE TABLE IF NOT EXISTS `{{render(vars.table_name)}}`
(
unique_row_id BYTES OPTIONS (description = 'A unique identifier for the crime, generated by hashing key crime attributes.'),
any_ NUMERIC OPTIONS (description = 'Número de l\'any de les dades'),
num_mes NUMERIC OPTIONS (description = 'Número del mes de les dades'),
nom_mes STRING OPTIONS (description = 'Nom del mes de les dades'),
data_particio DATE OPTIONS (description = 'Data per particionar'),
regio_policial_rp STRING OPTIONS (description = 'Nom de la Regió policial on s\'han produït els fets'),
area_basica_policial_abp STRING OPTIONS (description = 'Nom de l\'Area Basica Policial'),
provincia STRING OPTIONS (description = 'Província on s\'han produït els fets'),
comarca STRING OPTIONS (description = 'Comarca on s\'han produït els fets'),
municipi STRING OPTIONS (description = 'Municipi on s\'han produït els fets.'),
tipus_de_lloc_dels_fets STRING OPTIONS (description = 'Tipus del lloc on s\'han produït els fets (punt de la via, tipus d\'establiment, tipus de local, etc.)'),
tipus_de_fet STRING OPTIONS (description = 'Àmbit penal (delictes) o infraccions administratives'),
tipus_de_fet_codi_penal_o STRING OPTIONS (description = 'Detall del tipus de fet segons el codi penal (2n nivell) ) o les lleis relacionades amb l\'odi i la discriminació'),
ambit_procediment_fet STRING OPTIONS (description = 'Motivació que origina el fet o classificació del fet d\'odi i discriminació'),
rol_victima_o_autoria STRING OPTIONS (description = 'Víctima o persona autora dels fets relacionats amb odi i discriminació'),
sexe STRING OPTIONS (description = 'Home o dona.'),
edat_inici_fets NUMERIC OPTIONS (description = 'Edat de la víctima o de la persona autora quan es van iniciar els fets'),
nombre NUMERIC OPTIONS (description = 'Nombre de víctimes de cadascun dels fets relacionats amb l\'odi i la discriminació')
)
PARTITION BY data_particio
CLUSTER BY ambit_procediment_fet;
- id: bq_merge
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
MERGE INTO `{{render(vars.table_name)}}` T
USING `{{render(vars.enriched_table_name)}}` S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (unique_row_id, any_, num_mes, nom_mes, data_particio, regio_policial_rp, area_basica_policial_abp, provincia, comarca, municipi, tipus_de_lloc_dels_fets, tipus_de_fet, tipus_de_fet_codi_penal_o, ambit_procediment_fet, rol_victima_o_autoria, sexe, edat_inici_fets, nombre)
VALUES (S.unique_row_id, S.any_, S.num_mes, S.nom_mes, S.data_particio, S.regio_policial_rp, S.area_basica_policial_abp, S.provincia, S.comarca, S.municipi, S.tipus_de_lloc_dels_fets, S.tipus_de_fet, S.tipus_de_fet_codi_penal_o, S.ambit_procediment_fet, S.rol_victima_o_autoria, S.sexe, S.edat_inici_fets, S.nombre);
- id: parallel_cleanup
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: bq_delete_table_tmp
type: io.kestra.plugin.gcp.bigquery.DeleteTable
table: "{{(render(vars.enriched_table_name) | split('\\.', 3))[2]}}"
- id: bq_delete_table_ext
type: io.kestra.plugin.gcp.bigquery.DeleteTable
table: "{{(render(vars.external_table_name) | split('\\.', 3))[2]}}"
- id: delete_from_gcs
type: io.kestra.plugin.gcp.gcs.Delete
uri: "{{render(vars.gcs_file)}}"
- id: purge_files
type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
description: To avoid cluttering your storage, we will remove the downloaded files
pluginDefaults:
- type: io.kestra.plugin.gcp
values:
serviceAccount: "{{secret('GCP_SERVICE_ACCOUNT')}}"
projectId: "{{envs.gcp_project_id}}"
bucket: "{{envs.gcp_bucket_name}}"
dataset: "{{envs.gcp_dataset_name}}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "@annually"