Skip to main content

Filters

ValuesMatchPossibilitiesFilter

The ValuesMatchPossibilitiesFilter filter is used to exclude records where a field matches any of a list of possibilities.

The ValuesMatchPossibilitiesFilter is accessible via the implementation string nodestream.pipeline.filters:ValuesMatchPossibilitiesFilter.

KeyDescriptionTypeDefaultRequired
fieldsA list of field matchers.List[FieldMatcher]N/AYes

ExcludeWhenValuesMatchPossibilities

The ExcludeWhenValuesMatchPossibilities filter is used to exclude records where a field matches any of a list of possibilities. The ExcludeWhenValuesMatchPossibilities is accessible via the implementation string nodestream.pipeline.filters:ExcludeWhenValuesMatchPossibilities.

KeyDescriptionTypeDefaultRequired
fieldsA list of field matchers.List[FieldMatcher]N/AYes

FieldMatcher

A FieldMatcher is an object with the following properties:

KeyDescriptionTypeDefaultRequired
valueAn expression that selects the value of the field to match on.JMESPathN/AYes
possibilitiesA list of values to match against.List[Any]N/AYes
normalizationA list of normalization functions to apply to the field value before matching. See normalization reference for flagsDict[str, Any]N/ANo

SchemaEnforcer

The SchemaEnforcer filter is used to enforce a schema on the records being processed. The SchemaEnforcer is accessible via the implementation string nodestream.pipeline.filters:SchemaEnforcer.

KeyDescriptionTypeDefaultRequired
enforcement_policyThe policy to use when enforcing the schema. enforce will strictly filter out records. warn will simply log that records are violating the schema`stringenforceNo
keyThe key in the object store to store the schemastringNoneNo
inference_sample_sizeThe number of records to use when inferring the schema. When set to None, the schema will not be inferred. If the schema is not inferred, it must be provided via the key.int1000No

Examples

Infer schema from records

filters:
- implementation: nodestream.pipeline.filters:SchemaEnforcer
enforcement_policy: enforce
inference_sample_size: 1000

Use a predefined schema

filters:
- implementation: nodestream.pipeline.filters:SchemaEnforcer
enforcement_policy: enforce
key: schema # This key much be stored inside the object store at <<pipeline_fila_sha_256>>/<<step-index>>/schema

Reminder

Remember that you can always build your own filters by implementing the Filter interface. See here for more information.