postpanda_helper package¶
Submodules¶
postpanda_helper.geo_helpers module¶
- postpanda_helper.geo_helpers.convert_geometry_for_postgis(frame: pandas.core.frame.DataFrame, column: str, in_place: bool = False) Tuple[Optional[pandas.core.frame.DataFrame], MutableMapping[str, sqlalchemy.sql.type_api.UserDefinedType]] [source]¶
- postpanda_helper.geo_helpers.df_to_shape(tbl: sqlalchemy.sql.schema.Table, frame: pandas.core.frame.DataFrame) None [source]¶
- postpanda_helper.geo_helpers.fill_geoseries(s: pandas.core.series.Series) Tuple[pandas.core.series.Series, bool] [source]¶
postpanda_helper.normalizer module¶
- postpanda_helper.normalizer.normalizer(frame: pandas.core.frame.DataFrame, spec_file: Union[str, os.PathLike], substituter: postpanda_helper.select_sert.SelectSert) list[pandas.core.frame.DataFrame] [source]¶
Normalizes dataframe based on a YAML spec file
Modifies dataframe inplace
YAML Example:
- action: replace_id columns: - datasource_id - units delete_old: false new_col_name: unit_id sql_column_names: - datasource_id - name table_name: units - action: replace_id columns: f delete_old: false new_col_name: frequency_id sql_column_names: name table_name: frequency - action: apply_funcs to_exec: keys: - functions # language=Python code: | from functools import partial import pandas as pd from postpanda_helper.pd_helpers import to_date def col_to_pt(s: pd.Series): import geopandas as gpd latlon = s.str.split(",", expand=True) out = gpd.GeoSeries(gpd.points_from_xy(latlon[1], latlon[0], crs="WGS84")) out[s.isna()] = None return out functions = { None: [ partial(to_date, freq_col="f", date_col="end", start_or_end="end"), partial(to_date, freq_col="f", date_col="start", start_or_end="start"), ], "last_updated": lambda x: pd.to_datetime(x, utc=True), "latlon": col_to_pt, "latlon2": col_to_pt, } - action: drop_cols to_drop: - units - iso3166 - unitsshort - lon - lat - lon2 - lat2 - action: drop_na_cols - action: rename_cols name_map: end: end_date start: start_date - action: extra_to_json excluded: - f - series_id - name - description - start_date - end_date - last_updated - geoset_id - data - datasource_id - frequency_id - source_id - unit_id - geography_id - geography2_id - latlon - latlon2 json_column: extra_data - action: many_to_many columns: - tag - geography_id table: x_data_points_geography - action: combine_to_tuples delete_old: true columns: charge_dep_gal_100mile: - city_cd - highway_cd - combined_cd elec_comp_kwh_100mile: - city_e - highway_e - comb_e
- Parameters
frame – frame to modify
spec_file – path to YAML file
substituter –
postpanda_helper.pandas_reader module¶
- class postpanda_helper.pandas_reader.PandasReader(*args, queue_max_size: int = 3, get_checksum: bool = False, column_to_uuid: Optional[Mapping[str, str]] = None, **kwargs)[source]¶
Bases:
logger_mixin.LoggerMixin
,multiprocessing.context.Process
Multiprocess based CSV reader
Can also either convert a column to a UUID (128 bit, hex encoded) by simply writing it as a zero padded hex value, or can calculate a checksum (As a UUID) for the whole line to use as a unique primary key.
The checksum is started with the input filename and uses the
blake2b()
hash algo.- Parameters
*args – positional arguments to pass to
pandas.read_csv()
queue_max_size – max queue size
get_checksum – should we find the checksum of the line?
column_to_uuid –
which column should we convert to an UUID?
{'original_column': 'original_col_name', 'new_column': 'new_col_name'}
**kwargs – keyword arguments to pass to
pandas.read_csv()
- property queue: multiprocessing.context.BaseContext.Queue¶
A queue containing the chunks that have been read
- Type
Returns
postpanda_helper.pandas_writer module¶
- class postpanda_helper.pandas_writer.PandasWriter(engine)[source]¶
Bases:
logger_mixin.LoggerMixin
Multiprocessing writer that writes to CSV files before copying into DB
postpanda_helper.pd_helpers module¶
- postpanda_helper.pd_helpers.convert_df_dates_to_timestamps(frame: pandas.core.frame.DataFrame, inplace=False)[source]¶
- postpanda_helper.pd_helpers.df_chunker(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], size)[source]¶
- postpanda_helper.pd_helpers.downcast(data: pandas.core.series.Series) pandas.core.series.Series [source]¶
Downcasts integer types to smallest possible type :param data:
Returns:
- postpanda_helper.pd_helpers.drop_duplicates_case_insensitive(data, *, subset=None, keep='first')[source]¶
- postpanda_helper.pd_helpers.fillna(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series])[source]¶
- postpanda_helper.pd_helpers.fillna_series(series: pandas.core.series.Series)[source]¶
- postpanda_helper.pd_helpers.get_common_initial_str(s: pandas.core.series.Series)[source]¶
- postpanda_helper.pd_helpers.interval_to_range(interval: pandas._libs.interval.Interval)[source]¶
- postpanda_helper.pd_helpers.is_date_interval(interval: pandas._libs.interval.Interval) bool [source]¶
- postpanda_helper.pd_helpers.map_to_bool(series: pandas.core.series.Series, true_val=None, false_val=None, fillna=False)[source]¶
- postpanda_helper.pd_helpers.period_to_interval(period: pandas._libs.tslibs.period.Period) pandas._libs.interval.Interval [source]¶
- postpanda_helper.pd_helpers.ser_to_date(ser: pandas.core.series.Series, freq, start_or_end=None)[source]¶
- postpanda_helper.pd_helpers.strip_all_spaces(frame: Union[pandas.core.series.Series, pandas.core.frame.DataFrame])[source]¶
strips all extra spaces (equivalent to a trim and remove doubled spaces)
- Parameters
frame –
Returns:
- postpanda_helper.pd_helpers.to_date(frame: pandas.core.frame.DataFrame, freq_col, date_col, start_or_end=None, date_freq_map=None)[source]¶
- postpanda_helper.pd_helpers.to_lower(data) Union[pandas.core.series.Series, pandas.core.frame.DataFrame] [source]¶
- postpanda_helper.pd_helpers.to_string_tuples(frame)[source]¶
Converts frame columns to combined tuple string
- postpanda_helper.pd_helpers.unfillna(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series])[source]¶
- postpanda_helper.pd_helpers.unfillna_series(series: pandas.core.series.Series)[source]¶
postpanda_helper.psql_helpers module¶
- class postpanda_helper.psql_helpers.PandaCSVtoSQL(dframe: pandas.core.frame.DataFrame, table, engine=None, primary_key=None, schema=None, chunksize=1000000, index=False, create_table=True, create_primary_key=True, **kwargs)[source]¶
Bases:
object
- property primary_key¶
- property sa_table¶
- postpanda_helper.psql_helpers.create_df_table_altered(frame, name, con, primary_keys, schema=None, index=True, index_label=None, dtype=None)[source]¶
- postpanda_helper.psql_helpers.pd_to_sql(frame: pandas.core.frame.DataFrame, name: str, con, schema: Optional[str] = None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype: Optional[MutableMapping[str, Any]] = None, method=None)[source]¶
With support for daterange and tsrange
postpanda_helper.select_sert module¶
- class postpanda_helper.select_sert.SelectSert(connection: Union[str, sqlalchemy.engine.base.Engine], default_schema: Optional[str] = None)[source]¶
Bases:
object
- Parameters
connection – Connection string or sqlalchemy Engine
default_schema – schema where the lookup tables are located
- replace_multiple(frame: pandas.core.frame.DataFrame, replace_spec: Sequence[Mapping[str, Any]]) None [source]¶
- replace_with_ids(frame: pandas.core.frame.DataFrame, columns: Union[Sequence[str], str], table_name: str, new_col_name: str, *, sql_column_names: Optional[Union[Sequence[str], str]] = None, sql_id_name: str = 'id', case_insensitive: bool = True, delete_old: bool = True, sql_columns_notnull: Optional[Sequence[str]] = None, column_split: Optional[Mapping[str, str]] = None, schema: Optional[str] = None, filter_columns: Optional[Sequence[str]] = None) None [source]¶
Module contents¶
- class postpanda_helper.PandaCSVtoSQL(dframe: pandas.core.frame.DataFrame, table, engine=None, primary_key=None, schema=None, chunksize=1000000, index=False, create_table=True, create_primary_key=True, **kwargs)[source]¶
Bases:
object
- property primary_key¶
- property sa_table¶
- class postpanda_helper.PandasReader(*args, queue_max_size: int = 3, get_checksum: bool = False, column_to_uuid: Optional[Mapping[str, str]] = None, **kwargs)[source]¶
Bases:
logger_mixin.LoggerMixin
,multiprocessing.context.Process
Multiprocess based CSV reader
Can also either convert a column to a UUID (128 bit, hex encoded) by simply writing it as a zero padded hex value, or can calculate a checksum (As a UUID) for the whole line to use as a unique primary key.
The checksum is started with the input filename and uses the
blake2b()
hash algo.- Parameters
*args – positional arguments to pass to
pandas.read_csv()
queue_max_size – max queue size
get_checksum – should we find the checksum of the line?
column_to_uuid –
which column should we convert to an UUID?
{'original_column': 'original_col_name', 'new_column': 'new_col_name'}
**kwargs – keyword arguments to pass to
pandas.read_csv()
- property queue: multiprocessing.context.BaseContext.Queue¶
A queue containing the chunks that have been read
- Type
Returns
- class postpanda_helper.PandasWriter(engine)[source]¶
Bases:
logger_mixin.LoggerMixin
Multiprocessing writer that writes to CSV files before copying into DB
- class postpanda_helper.SelectSert(connection: Union[str, sqlalchemy.engine.base.Engine], default_schema: Optional[str] = None)[source]¶
Bases:
object
- Parameters
connection – Connection string or sqlalchemy Engine
default_schema – schema where the lookup tables are located
- replace_multiple(frame: pandas.core.frame.DataFrame, replace_spec: Sequence[Mapping[str, Any]]) None [source]¶
- replace_with_ids(frame: pandas.core.frame.DataFrame, columns: Union[Sequence[str], str], table_name: str, new_col_name: str, *, sql_column_names: Optional[Union[Sequence[str], str]] = None, sql_id_name: str = 'id', case_insensitive: bool = True, delete_old: bool = True, sql_columns_notnull: Optional[Sequence[str]] = None, column_split: Optional[Mapping[str, str]] = None, schema: Optional[str] = None, filter_columns: Optional[Sequence[str]] = None) None [source]¶