Skip to main content

Extractors

Extractors are the start of a pipeline and are used to pull raw data from a source. Nodestream has a number of extractors built in and you can also create your own.

The File Extractor Family

There are a family of extractors that are used to pull data from files. These include:

  • FileExtractor - Used to pull data from a file on the local filesystem.
  • RemoteFileExtractor - Used to pull data from an HTTP server.
  • S3Extractor - Used to pull data from a file in an S3 bucket.

Each of these extractors converts the data from the file in a consistent way across all of the extractors.

  • .csv files are supported by default and yield each row of the csv file as a dictionary to the next step in the pipeline.
  • .json files are supported by default and yield the entire JSON object to the next step in the pipeline.
  • .jsonl files are supported by default and yield each line of the JSONL file as a dictionary to the next step in the pipeline.
  • .parquet files are supported by default and yield each row of the parquet file as a dictionary to the next step in the pipeline.
  • .txt files are supported by default and yield each line of the text file as a dictionary to the next step in the pipeline. (The key is line and the value is the line of text.)
  • .yaml files are supported by default and yield the entire YAML object to the next step in the pipeline.

Compressed file formats are also supported:

  • .gz: {File Format Extension}.gz files are decompressed using gzip.open and stripped of the .gz extension and processed in the subsequent extension.
  • .bz2: {File Format Extension}.bz2 files are decompressed using bz2.open and stripped of the .bz2 extension and processed in the subsequent extension.

UnifiedFileExtractor

The UnifiedFileExtractor is used to pull data from a file on the local filesystem, an HTTP server, or an S3 bucket. The UnifiedFileExtractor is accessible via the implementation string nodestream.pipeline.extractors.files:UnifiedFileExtractor.

The arguments for the UnifiedFileExtractor are:

ArgumentDescriptionTypeRequiredDefault Value
sourcesA list of sources to pull data from.List[Dict]YesN/A

Each object in the sources list should include a type key that specifies the type of extractor to use and the arguments for that extractor.

For example, to pull data from a local file, an HTTP server, and an S3 bucket, you would use the following configuration:

implementation: nodestream.pipeline.extractors.files:UnifiedFileExtractor
arguments:
sources:
- type: local
globs:
- /path/to/local/file.csv
- type: remote
urls:
- https://example.com/remote/file.csv
- type: s3
bucket: my-bucket
prefix: path/to/s3/file.csv

Local Arguments

The arguments for the local source are:

ArgumentDescriptionTypeRequiredDefault Value
globsA list of glob strings representing the files to load.List[str]YesN/A

Remote Arguments

The arguments for the remote source are:

ArgumentDescriptionTypeRequiredDefault Value
urlsA list of URLs representing the files to load.List[str]YesN/A
memory_spooling_size_in_mbThe size of the memory spooling buffer in MB.intNo10

S3 Arguments

The arguments for the s3 source are:

ArgumentDescriptionTypeRequiredDefault Value
bucketThe name of the S3 bucket.strYesN/A
prefixFilter the objects pulled from S3 to only the ones that have this prefix in the name.strNo""
object_formatFormat is inferred from the file extension. If the file extension is not recognized, you can specify the format here.strNo""
archive_dirAfter a object has been processed, move the object for its current location to an a specified archive folder inside the bucket. Objects inside this folder are ignored when processing.strNo""
assume_role_arnThe ARN of the role to assume when pulling data from S3.strNo""
assume_role_external_idThe external ID to use when assuming the role.strNo""
**session_argsAdditional arguments to pass to the boto3.Session function.strNo{}

QueueConnector

The QueueConnector describes how to poll data from the underlying queue mechanism.

AWS SQS

- implementation: nodestream.pipeline.extractors.queues:QueueExtractor
arguments:
# rest of the stream extractor format arguments
connector: sqs
queue_url: "https://sqs.us-east-1.amazonaws.com/177715257436/MyQueue"

Additional Arguments

With the previous minimal configuration, it will use your currently active aws credentials to read messages from https://sqs.us-east-1.amazonaws.com/177715257436/MyQueue. However, there are many options you can add to this:

Parameter NameTypeDescription
message_system_attribute_namesStringA list of attributes that need to be returned along with each message. (Default: "All")
message_attribute_namesStringA list of attribute names to receive. (Default: "All")
delete_after_readBooleanDeletes the batch of messages from the queue after they are yielded to the next pipeline step. (Default: True)
assume_role_arnStringThe ARN of a role to assume before interacting with the SQS Queue. Of course the appropriate configuration is needed on both the current credentials as well as the target role.
assume_role_external_idStringThe external id that is required to assume role. Only used when assume_role_arn is set and only needed when the role is configured to require an external id.
**session_argsAnyAny other argument that you want sent to the boto3.Session that will be used to interact with AWS.

StreamExtractor

The StreamExtractor is used to pull data from a streaming backend like Kafka. The StreamExtractor is accessible via the implementation string nodestream.pipeline.extractors.streams:StreamExtractor.

The arguments for the StreamExtractor are:

ArgumentDescriptionTypeRequiredDefault Value
connectorThe name of the connector to use. (Valid value is kafka currently)strYesN/A
record_formatThe only valid value is json currently.strNojson

Kafka

When using the kafka connector, the following additional arguments are available on the StreamExtractor:

ArgumentDescriptionTypeRequiredDefault Value
topic_nameThe name of the topic to consume from.strYesN/A
group_idThe name of the consumer group.strNoN/A
bootstrap_serversA list of kafka servers to connect to.List[str]YesN/A
offset_resetThe offset to start consuming from.strNolatest
security_protocolThe security protocol to use.strNoPLAINTEXT
max_recordsThe maximum number of records to pull in one call.intNo10
poll_timeoutThe maximum time to wait for records in seconds. Once met, the extractor will signal to the other steps to flush memory of batched data and then re-pollintNo30

AthenaExtractor

The AthenaExtractor is used to pull data from an AWS Athena query. The AthenaExtractor is accessible via the implementation string nodestream.pipeline.extractors.stores.aws:AthenaExtractor. The arguments for the AthenaExtractor are:

ArgumentDescriptionTypeRequiredDefault Value
queryThe query to run. The results yielded by the extractor will reflect the shape of the data returned from the query.strYesN/A
workgroupThe name of the workgroup to use. See the AWS DOCS for more information.strYesN/A
databaseThe name of the database to use.strYesN/A
output_locationThe output location string to store results for Athena. See the AWS Docs for more information.strYesN/A
poll_interval_secondsThe interval in seconds to poll for the results of the query.intNo1
page_sizeThe number of records to pull in one call.intNo500
assume_role_arnThe ARN of the role to assume when pulling data from Athena.strNo""
assume_role_external_idThe external ID to use when assuming the role.strNo""
**session_argsAdditional arguments to pass to the boto3.Session function.strNo{}

SimpleApiExtractor

The SimpleApiExtractor is used to pull data from a simple JSON API. The SimpleApiExtractor is accessible via the implementation string nodestream.pipeline.extractors:SimpleApiExtractor. The arguments for the SimpleApiExtractor are:

ArgumentDescriptionTypeRequiredDefault Value
urlThe URL of the API to call.strYesN/A
yield_fromThe key to yield from the response. This will liely be a list objects in the repsonse.strNoYields Entire Response if unset.
offset_query_paramThe name of the query parameter to use for pagination. If set pagination will occur.strNoUnset. Does not paginate.
headersA dictionary of headers to send with the request.dictNo{}

DynamoDBExtractor

The DynamoDBExtractor issues a query to an Amazon DynamoDB table using the scan method. The details on this AWS api call can be found here, and some of the parameters that are exposed by the interface are shown below.

- implementation: nodestream.pipeline.extractors.stores.aws:AthenaExtractor
arguments:
table_name: test_table;
limit: 100
scan_filter:
attribute_name:
AttributeValueList:
- S: 'some_string'
ComparisonOperator: 'EQ'
projection_expression: 'string expression'
filter_expression: 'string expression'

The arguments for the DynamoDBExtractor are:

Parameter NameTypeDescription
table_nameStringThe name of the dynamoDB table within the account.
limitIntegerThe maximum number of records to be collected from the table for each call.
scan_filterDictFilter for the results to be returned, does not minimize DynamoDB credit usage. See DynamoDB Scan Docs for detailed information on use.
projection_expressionStringString expression for projecting the results. See DynamoDB Projection Docs for detailed information on the format.
filter_expressionStringString expression for filtering the results (alternative to the scan_filter). See DynamoDB Filter Docs for detailed information on the format.
assume_role_arnStringThe ARN of a role to assume before interacting with the bucket. Of course the appropriate configuration is needed on both the current credentials as well as the target role.
assume_role_external_idStringThe external id that is required to assume role. Only used when assume_role_arn is set and only needed when the role is configured to require an external id.
**session_argsAnyAny other argument that you want sent to the boto3.Session that will be used to interact with AWS.

TimeToLiveConfigurationExtractor

The TimeToLiveConfigurationExtractor is used to pull data for a Time To Live Pipeline. The TimeToLiveConfigurationExtractor is accessible via the implementation string nodestream.pipeline.extractors:TimeToLiveConfigurationExtractor. The arguments for the TimeToLiveConfigurationExtractor are:

ArgumentDescriptionTypeRequiredDefault Value
configurationsA list of configurations for the Time To Live pipeline.List[Dict[str, Any]]YesN/A
graph_object_typeThe type of object to apply the TTL to. (NODE or RELATIONHIP)strYesN/A
override_expiry_in_hoursThe number of hours after which the object should be deleted. Overrides any locally set version.
intNoN/A

Each object in the configurations list should include the following arguments:

ArgumentDescriptionTypeRequiredDefault Value
object_typeThe object type to apply the TTL to. (Node or relationship type)strYesN/A
expiry_in_hoursThe number of hours after which the object should be deleted.intNoN/A
enabledWhether or not the TTL is enabled. Defaults to True.boolNoTrue
batch_sizeThe number of objects to delete in a single batch. Defaults to 100.intNo100
custom_queryA custom query to use to delete the objects. If not provided, the default query will be used. The custom query is database implmentation specific.strNoN/A

Reminder

Remember that you can always build your own extractors by implementing the Extractor interface. See here for more information.