postpanda_helper package

Submodules

postpanda_helper.geo_helpers module

postpanda_helper.geo_helpers.convert_geometry_for_postgis(frame: pandas.core.frame.DataFrame, column: str, in_place: bool = False) Tuple[Optional[pandas.core.frame.DataFrame], MutableMapping[str, sqlalchemy.sql.type_api.UserDefinedType]][source]
postpanda_helper.geo_helpers.df_to_shape(tbl: sqlalchemy.sql.schema.Table, frame: pandas.core.frame.DataFrame) None[source]
postpanda_helper.geo_helpers.fill_geoseries(s: pandas.core.series.Series) Tuple[pandas.core.series.Series, bool][source]

postpanda_helper.normalizer module

postpanda_helper.normalizer.handle_exec(obj)[source]
postpanda_helper.normalizer.load_spec(file_path)[source]
postpanda_helper.normalizer.normalizer(frame: pandas.core.frame.DataFrame, spec_file: Union[str, os.PathLike], substituter: postpanda_helper.select_sert.SelectSert) list[pandas.core.frame.DataFrame][source]

Normalizes dataframe based on a YAML spec file

Modifies dataframe inplace

YAML Example:

- action:       replace_id
  columns:
    - datasource_id
    - units
  delete_old:   false
  new_col_name: unit_id
  sql_column_names:
    - datasource_id
    - name
  table_name:   units

- action:           replace_id
  columns:          f
  delete_old:       false
  new_col_name:     frequency_id
  sql_column_names: name
  table_name:       frequency

- action:  apply_funcs
  to_exec:
    keys:
      - functions
    # language=Python
    code: |
           from functools import partial
           import pandas as pd
           from postpanda_helper.pd_helpers import to_date


           def col_to_pt(s: pd.Series):
               import geopandas as gpd

               latlon = s.str.split(",", expand=True)
               out = gpd.GeoSeries(gpd.points_from_xy(latlon[1], latlon[0], crs="WGS84"))
               out[s.isna()] = None
               return out


           functions = {
               None: [
                   partial(to_date, freq_col="f", date_col="end", start_or_end="end"),
                   partial(to_date, freq_col="f", date_col="start", start_or_end="start"),
               ],
               "last_updated": lambda x: pd.to_datetime(x, utc=True),
               "latlon": col_to_pt,
               "latlon2": col_to_pt,
           }

- action: drop_cols
  to_drop:
    - units
    - iso3166
    - unitsshort
    - lon
    - lat
    - lon2
    - lat2
- action: drop_na_cols

- action: rename_cols
  name_map:
    end:   end_date
    start: start_date

- action:      extra_to_json
  excluded:
    - f
    - series_id
    - name
    - description
    - start_date
    - end_date
    - last_updated
    - geoset_id
    - data
    - datasource_id
    - frequency_id
    - source_id
    - unit_id
    - geography_id
    - geography2_id
    - latlon
    - latlon2
  json_column: extra_data

- action: many_to_many
  columns:
    - tag
    - geography_id
  table:  x_data_points_geography

- action: combine_to_tuples
  delete_old: true
  columns:
    charge_dep_gal_100mile:
      - city_cd
      - highway_cd
      - combined_cd
    elec_comp_kwh_100mile:
      - city_e
      - highway_e
      - comb_e
Parameters
  • frame – frame to modify

  • spec_file – path to YAML file

  • substituter

postpanda_helper.normalizer.walk(obj)[source]
postpanda_helper.normalizer.walk_dict(obj)[source]
postpanda_helper.normalizer.walk_list(obj)[source]

postpanda_helper.pandas_reader module

class postpanda_helper.pandas_reader.PandasReader(*args, queue_max_size: int = 3, get_checksum: bool = False, column_to_uuid: Optional[Mapping[str, str]] = None, **kwargs)[source]

Bases: logger_mixin.LoggerMixin, multiprocessing.context.Process

Multiprocess based CSV reader

Can also either convert a column to a UUID (128 bit, hex encoded) by simply writing it as a zero padded hex value, or can calculate a checksum (As a UUID) for the whole line to use as a unique primary key.

The checksum is started with the input filename and uses the blake2b() hash algo.

Parameters
  • *args – positional arguments to pass to pandas.read_csv()

  • queue_max_size – max queue size

  • get_checksum – should we find the checksum of the line?

  • column_to_uuid

    which column should we convert to an UUID?

    {'original_column': 'original_col_name', 'new_column': 'new_col_name'}

  • **kwargs – keyword arguments to pass to pandas.read_csv()

property queue: multiprocessing.context.BaseContext.Queue

A queue containing the chunks that have been read

Type

Returns

run()[source]

Method to be run in sub-process; can be overridden in sub-class

postpanda_helper.pandas_writer module

class postpanda_helper.pandas_writer.PandasWriter(engine)[source]

Bases: logger_mixin.LoggerMixin

Multiprocessing writer that writes to CSV files before copying into DB

get_writer(schema, table)[source]
join(upsert=True, cleanup=True)[source]

Join and write SQL

Parameters
  • upsert – if we might need to overwrite things, insert everything into a Temp table, delete the existing rows, and then copy over to main table

  • cleanup – should we delete the temporary files?

Returns:

stop()[source]

Done writing to queue, so close it out. Returns:

postpanda_helper.pd_helpers module

postpanda_helper.pd_helpers.as_df(obj)[source]
postpanda_helper.pd_helpers.as_list(obj)[source]
postpanda_helper.pd_helpers.clean_frame(frame, case_insensitive=True)[source]
postpanda_helper.pd_helpers.convert_df_dates_to_timestamps(frame: pandas.core.frame.DataFrame, inplace=False)[source]
postpanda_helper.pd_helpers.df_chunker(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], size)[source]
postpanda_helper.pd_helpers.disable_copy_warning()[source]
postpanda_helper.pd_helpers.downcast(data: pandas.core.series.Series) pandas.core.series.Series[source]

Downcasts integer types to smallest possible type :param data:

Returns:

postpanda_helper.pd_helpers.drop_duplicates_case_insensitive(data, *, subset=None, keep='first')[source]
postpanda_helper.pd_helpers.fillna(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series])[source]
postpanda_helper.pd_helpers.fillna_series(series: pandas.core.series.Series)[source]
postpanda_helper.pd_helpers.get_common_initial_str(s: pandas.core.series.Series)[source]
postpanda_helper.pd_helpers.get_max_chars_in_common(s)[source]
postpanda_helper.pd_helpers.interval_to_range(interval: pandas._libs.interval.Interval)[source]
postpanda_helper.pd_helpers.is_date_interval(interval: pandas._libs.interval.Interval) bool[source]
postpanda_helper.pd_helpers.map_to_bool(series: pandas.core.series.Series, true_val=None, false_val=None, fillna=False)[source]
postpanda_helper.pd_helpers.period_to_interval(period: pandas._libs.tslibs.period.Period) pandas._libs.interval.Interval[source]
postpanda_helper.pd_helpers.ser_to_date(ser: pandas.core.series.Series, freq, start_or_end=None)[source]
postpanda_helper.pd_helpers.strip_all_spaces(frame: Union[pandas.core.series.Series, pandas.core.frame.DataFrame])[source]

strips all extra spaces (equivalent to a trim and remove doubled spaces)

Parameters

frame

Returns:

postpanda_helper.pd_helpers.to_date(frame: pandas.core.frame.DataFrame, freq_col, date_col, start_or_end=None, date_freq_map=None)[source]
postpanda_helper.pd_helpers.to_lower(data) Union[pandas.core.series.Series, pandas.core.frame.DataFrame][source]
postpanda_helper.pd_helpers.to_string_tuples(frame)[source]

Converts frame columns to combined tuple string

postpanda_helper.pd_helpers.to_string_tuples_drop_val(frame, value=0.0)[source]
postpanda_helper.pd_helpers.unfillna(frame: Union[pandas.core.frame.DataFrame, pandas.core.series.Series])[source]
postpanda_helper.pd_helpers.unfillna_series(series: pandas.core.series.Series)[source]
postpanda_helper.pd_helpers.value_to_nan(frame, value=0.0)[source]

postpanda_helper.psql_helpers module

class postpanda_helper.psql_helpers.PandaCSVtoSQL(dframe: pandas.core.frame.DataFrame, table, engine=None, primary_key=None, schema=None, chunksize=1000000, index=False, create_table=True, create_primary_key=True, **kwargs)[source]

Bases: object

get_col_names()[source]
get_full_tablename(quotes=True)[source]
async import_csv(csv_fh, use_temp_table=True, wait_on=None)[source]
import_csv_no_temp(csv_fh)[source]
property primary_key
async process(use_temp_table=True, wait_on=None)[source]
process_simple()[source]
property sa_table
async to_csv(frame=None, file_handle=None, seek=True)[source]
postpanda_helper.psql_helpers.chunker(seq, size)[source]
postpanda_helper.psql_helpers.conform_to_list(obj)[source]
postpanda_helper.psql_helpers.create_df_table_altered(frame, name, con, primary_keys, schema=None, index=True, index_label=None, dtype=None)[source]
postpanda_helper.psql_helpers.disable_reflection_warning()[source]
postpanda_helper.psql_helpers.pd_to_sql(frame: pandas.core.frame.DataFrame, name: str, con, schema: Optional[str] = None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype: Optional[MutableMapping[str, Any]] = None, method=None)[source]

With support for daterange and tsrange

postpanda_helper.psql_helpers.possible_upsert(pdtable: pandas.io.sql.SQLTable, conn: sqlalchemy.engine.base.Connection, keys, data_iter)[source]
postpanda_helper.psql_helpers.psql_upsert(index_elements)[source]
postpanda_helper.psql_helpers.use_identity(element, compiler, **kw)[source]

postpanda_helper.select_sert module

class postpanda_helper.select_sert.SelectSert(connection: Union[str, sqlalchemy.engine.base.Engine], default_schema: Optional[str] = None)[source]

Bases: object

Parameters
  • connection – Connection string or sqlalchemy Engine

  • default_schema – schema where the lookup tables are located

replace_multiple(frame: pandas.core.frame.DataFrame, replace_spec: Sequence[Mapping[str, Any]]) None[source]
replace_with_ids(frame: pandas.core.frame.DataFrame, columns: Union[Sequence[str], str], table_name: str, new_col_name: str, *, sql_column_names: Optional[Union[Sequence[str], str]] = None, sql_id_name: str = 'id', case_insensitive: bool = True, delete_old: bool = True, sql_columns_notnull: Optional[Sequence[str]] = None, column_split: Optional[Mapping[str, str]] = None, schema: Optional[str] = None, filter_columns: Optional[Sequence[str]] = None) None[source]

Module contents

class postpanda_helper.PandaCSVtoSQL(dframe: pandas.core.frame.DataFrame, table, engine=None, primary_key=None, schema=None, chunksize=1000000, index=False, create_table=True, create_primary_key=True, **kwargs)[source]

Bases: object

get_col_names()[source]
get_full_tablename(quotes=True)[source]
async import_csv(csv_fh, use_temp_table=True, wait_on=None)[source]
import_csv_no_temp(csv_fh)[source]
property primary_key
async process(use_temp_table=True, wait_on=None)[source]
process_simple()[source]
property sa_table
async to_csv(frame=None, file_handle=None, seek=True)[source]
class postpanda_helper.PandasReader(*args, queue_max_size: int = 3, get_checksum: bool = False, column_to_uuid: Optional[Mapping[str, str]] = None, **kwargs)[source]

Bases: logger_mixin.LoggerMixin, multiprocessing.context.Process

Multiprocess based CSV reader

Can also either convert a column to a UUID (128 bit, hex encoded) by simply writing it as a zero padded hex value, or can calculate a checksum (As a UUID) for the whole line to use as a unique primary key.

The checksum is started with the input filename and uses the blake2b() hash algo.

Parameters
  • *args – positional arguments to pass to pandas.read_csv()

  • queue_max_size – max queue size

  • get_checksum – should we find the checksum of the line?

  • column_to_uuid

    which column should we convert to an UUID?

    {'original_column': 'original_col_name', 'new_column': 'new_col_name'}

  • **kwargs – keyword arguments to pass to pandas.read_csv()

property queue: multiprocessing.context.BaseContext.Queue

A queue containing the chunks that have been read

Type

Returns

run()[source]

Method to be run in sub-process; can be overridden in sub-class

class postpanda_helper.PandasWriter(engine)[source]

Bases: logger_mixin.LoggerMixin

Multiprocessing writer that writes to CSV files before copying into DB

get_writer(schema, table)[source]
join(upsert=True, cleanup=True)[source]

Join and write SQL

Parameters
  • upsert – if we might need to overwrite things, insert everything into a Temp table, delete the existing rows, and then copy over to main table

  • cleanup – should we delete the temporary files?

Returns:

stop()[source]

Done writing to queue, so close it out. Returns:

class postpanda_helper.SelectSert(connection: Union[str, sqlalchemy.engine.base.Engine], default_schema: Optional[str] = None)[source]

Bases: object

Parameters
  • connection – Connection string or sqlalchemy Engine

  • default_schema – schema where the lookup tables are located

replace_multiple(frame: pandas.core.frame.DataFrame, replace_spec: Sequence[Mapping[str, Any]]) None[source]
replace_with_ids(frame: pandas.core.frame.DataFrame, columns: Union[Sequence[str], str], table_name: str, new_col_name: str, *, sql_column_names: Optional[Union[Sequence[str], str]] = None, sql_id_name: str = 'id', case_insensitive: bool = True, delete_old: bool = True, sql_columns_notnull: Optional[Sequence[str]] = None, column_split: Optional[Mapping[str, str]] = None, schema: Optional[str] = None, filter_columns: Optional[Sequence[str]] = None) None[source]