Source code for record_convertor.command_processor

"""Module to provide DateFieldConvertor class.

This class allows you to do a number of date conversions on a record. This is
usually done prior to creating a new record from this existing record, thus
ensuring a well formatted record prior to processing.

Conditions can be included and conversions will only be executed if all
conditions comply.

Availale conversion commands
    - $fixed_value
        Returns the fixed value given in the arguments
    -  $split_field
        Splits a field based upon a given seperator and return a given entry
        (index) from the resulting split list
        Args: field_name (str), seperator (str), index (int)
    - $int_from_string
        returns a numerical value (as a string) from the input string based
        upon the first sequence of numerical charraters found in the string.
        Characters can be removed first by adding them to the list in the
        `seperators` argument.
        '123 456.00 EUR' can be converted to '123456' with seperators arg [' ']
        Args: field_name (str), seperators (list of str)
    - $join
        return a joined set of fields and optionally fixed values with
        optionally a seperator in between the fields
        Args: [list of str] where str's are
            - `$seperatorX' where X is the seperator. Field is optional
               but if used should always be at index 0 of the list
            - <field_name> value will be retrieved from entry in record with
              `field_name`
            - <$fixed_value> value `fixed_value` will be used
    - $normalized_address
        returns a normalized address as string using an external Geo API
        Args: address (str), zip_code (str), city (str),
              iso3116_country_code (str)
            -> all mandatory, representing the (nestred) field in the
               record where the actual values are to be retrieved
    - $point
        returns a geojson Point dict
        Args: lat (str), lon (str)
            ->  all mandatory, representing the (nested) field in the record
                where the actual values are to be retrieved
    - $full_record
        returns the full record
        Args: none
    - $join_key_value
        returns a key value pair from two record field defined in the args.
        If with retrieved values no key value pair can be made None is returned
        Args: key(str), value(str)
            ->  both mandatory, representing the (nested) field in the record
                where the actual values are to be retrieved
    - $get_coordinates
        returns a geoJson point dict representing the address in lon, lat.
        Args: address_keys(list of str), zip_code(str), city (str),
              iso3116_country_code (str)
            -> all are optional representing the (nested) field in the record
               where the actual values are to be retrieved
            -> address field is created by joining the different values for
               retrieved with the list of address_keys
    - $from_list
        returns a list of dicts created from specific fields in a list of dicts
        provided by the record. This method can be used to transform a list
        of dicts into the correct format.
        Args: list_field_name (str) -> key retrieve the input list of dicts
              <keys> (str) -> keys to be used in new list of dicts. Value for
                              this key is retrieved with the value of this key
                              i.e. 'target_field1': 'field1' will result in
                              'target_field1': list_item['field1']
    - $to_list
        returns a list with values retrieved from the record. None values are
        skipped.
        Args: List of keys (str) for which the values need to be returned in
              the return list
    - $to_list_dynamic
        returns a list with the results of the rule sets provided in the input
        list. This allows to create a list of more complex objects
        Args: List of rule dicts that result in the required data objects
              when processed against the input record. Each rule dict will
              result in a single entry in the list
     - $to_int
        returns a string from the record with all strings removed indiacted by
        the `skip` list in the arguments.
        Args: field_name (str) -> field name from where to retrieve the string
              skip (list of str or str) -> strings that need to be removed from
                                           the input string
    - $set_to_none_value
        return a None value

    - $allow_none_value
        retrieves a value from the record but if not found leaves a None value
        instead of skipping the field

    - $current_year
        sets the field value to the current year as a str
"""

import re
from datetime import datetime
from typing import Union

import jmespath
from jmespath.exceptions import ParseError

from .command_helper import (
    lat_lon_to_geojson_point,
    process_args_is_dict,
    process_args_is_list,
)

__all__ = ["ProcessCommand"]


[docs] class ProcessCommand: """ Class to create a value for the output record, mostly based upon one or more fields from the input record. args: record (dict): record that needs some conversion action process_command (str) : process command to be executed to obtain the correct value from the record process_args (str, dict) : arguments needed to run the process command add_process_commands (dict) : dict with process names and custom lambda's returns: value (dict, list, str, int, float): output of teh conversion """ def __init__( self, record: dict, process_command: str, process_args: Union[dict, list, str], record_convertor, add_process_commands=None, ): self.record = record # remove the `$` from the command self.process_command = process_command[1:] self.process_args = process_args self.record_convertor = record_convertor self.add_process_commands = add_process_commands or {}
[docs] def current_year(self): """Returns current year in 4 decimals""" return str(datetime.now().year)
[docs] def set_to_none_value(self): """Returns None value""" return None
[docs] def allow_none_value(self): """Returns value for field and None if no field can be found""" process_args = process_args_is_dict(self.process_args) return self._get_field(process_args.get("field_name"), None)
[docs] def to_list(self): """ retrieve the values for a list of fields and returns them as a list """ process_args = process_args_is_list(self.process_args) return list( filter( None, [self._get_field(field_name) for field_name in process_args], ) )
[docs] def to_int(self): """turn a string into an int""" process_args = process_args_is_dict(self.process_args) field_name = process_args.get("field_name") amount = self._get_field(field_name) if not amount: return None remove_list = process_args.get("strip") if remove_list is None: remove_list = [] elif isinstance(remove_list, (str, int)): remove_list = [str(remove_list)] for item in remove_list: amount = amount.replace(item, "") return amount
[docs] def first_item_from_list(self): items_from_list = self.from_list() if items_from_list: return items_from_list[0] return None
[docs] def from_list(self): """ converts a list of dicts from the input record to a new cleaned list of dicts """ process_args = process_args_is_dict(self.process_args) rules = process_args.copy() obj_list = self._get_field(rules.pop("list_field_name")) if not (obj_list and isinstance(obj_list, list)): return [] return list( filter( None, [ self.record_convertor(rules=rules, record=obj).convert() for obj in obj_list ], ) )
[docs] def to_list_dynamic(self): return list( filter( None, [ self.record_convertor(rules=rule, record=self.record).convert() for rule in self.process_args ], ) )
[docs] def join_key_value(self): process_args = process_args_is_dict(self.process_args) key_key = process_args.get("key", False) value_key = process_args.get("value", False) if not (key_key and value_key): raise KeyError("Missing `key` or `value` argument") # check if key value needs to be composed if isinstance(key_key, dict): key = self.record_convertor(rules=key_key, record=self.record).convert() else: key = self._get_field(key_key) # check if value value needs to be composed if isinstance(value_key, dict): value = self.record_convertor(rules=value_key, record=self.record).convert() else: value = self._get_field(value_key) if key and value: try: return {key: value} except (TypeError, KeyError): return None
[docs] def key_value(self): process_args = process_args_is_dict(self.process_args) key = process_args.get("key", False) value = process_args.get("value", False) if not (key and value): raise KeyError("Missing `key` or `value` argument") return {key: self._get_field(value)}
[docs] def full_record(self): """returns the full record""" return self.record
[docs] def point(self): """ Retrieves lat, lon fields from record and returns them in point format. """ process_args = process_args_is_dict(self.process_args) # check if lat and lon field names are provided in the value lat_field = process_args.get("lat", False) lon_field = process_args.get("lon", False) if not (lat_field and lon_field): raise ValueError("Both lat and lon field required for Point Field") # get the lattitude and longitude from the record and return point # field lat = self._get_field(lat_field) lon = self._get_field(lon_field) return lat_lon_to_geojson_point(latitude=lat, longitude=lon)
[docs] def join(self): """ joins the record values for the list of keys to a single string """ def join_value(key): """ Return the actual value (string) that belongs to the given key """ # check if fixed value needs to be returned if key[0] == "$": return key[1:] # if not fixed value then return the value that belongs to the # given (nested) key(s) res = self._get_field(key) return "" if res is None else str(res) if not isinstance(self.process_args, list): raise ValueError("provided list of keys is not of type list") seperator = "" # set seperator if defined and remove it from the list of keys join_arguments = self.process_args.copy() if "$seperator" in join_arguments[0]: seperator = join_arguments.pop(0)[-1] try: return seperator.join([join_value(key) for key in join_arguments]).strip() except KeyError: return None
[docs] def int_from_string(self): """ returns the value represented in a string in a string format """ process_args = process_args_is_dict(self.process_args) seperators = process_args.get("seperators", False) field_name = process_args.get("field_name", False) if not (seperators and field_name): return None field_value = self._get_field(field_name) if not isinstance(field_value, str): return None for seperator in seperators: field_value = field_value.replace(seperator, "") match = re.search(r"\d+", field_value) return match.group() if match else None
[docs] def split_field(self): """ Split the requested field and returns a specific entry from the split result """ process_args = process_args_is_dict(self.process_args) seperator = process_args.get("seperator", False) field_name = process_args.get("field_name", False) index = process_args.get("index", False) if not (seperator and field_name and (index is not False)): return None field_value = self._get_field(field_name) if field_value is None: return None try: return field_value.split(seperator)[index] except (IndexError, AttributeError): return None
[docs] def fixed_value(self): """return the (fixed) value given in the conversion args""" return self.process_args
[docs] def get_value(self): """calls the actual process command first command is looked up in default commands. If not found there it is looked up in custom commands """ if self.process_command in dir(self): return getattr(self, self.process_command)() cust_command = self.process_command[1:] cust_comm_class = self.add_process_commands.get(cust_command, None) if cust_comm_class: return cust_comm_class(self.record, self.process_args).convert() raise NotImplementedError(f"Field conversion command `{self.process_command}`")
def _get_field(self, key, rec=None): record = rec or self.record if key: # key elemenets in nested keys are surround with "". For exmample # key.example-1 becomes "key"."example-1". # Needed for jmespath can hande special characters in the keys nested_keys = key.split(".") nested_key = ".".join(['"' + key + '"' for key in nested_keys]) try: return jmespath.search(nested_key, record) except ParseError: pass return None