dataframely.collection module

class dataframely.collection.Collection[source]

Bases: BaseCollection, ABC

Base class for all collections of data frames with a predefined schema.

A collection is comprised of a set of members which are collectively “consistent”, meaning they the collection ensures that invariants are held up across members. This is different to dataframely schemas which only ensure invariants within individual members.

In order to properly ensure that invariants hold up across members, members must have a “common primary key”, i.e. there must be an overlap of at least one primary key column across all members. Consequently, a collection is typically used to represent “semantic objects” which cannot be represented in a single data frame due to 1-N relationships that are managed in separate data frames.

A collection must only have type annotations for :class:`~dataframely.LazyFrame`s with known schema:

class MyCollection(dy.Collection):
    first_member: dy.LazyFrame[MyFirstSchema]
    second_member: dy.LazyFrame[MySecondSchema]

Besides, it may define filters (c.f. filter()) and arbitrary methods.

Note:

The dataframely mypy plugin ensures that the dictionaries passed to class methods contain exactly the required keys.

Attention:

Do NOT use this class in combination with from __future__ import annotations as it requires the proper schema definitions to ensure that the collection is implemented correctly.

Methods

cast(data, /)

Initialize a collection by casting all members into their correct schemas.

collect_all()

Collect all members of the collection.

common_primary_keys()

The primary keys shared by non ignored members of the collection.

create_empty()

Create an empty collection without any data.

filter(data, /, *[, cast])

Filter the members data frame by their schemas and the collection's filters.

ignored_members()

The names of all members of the collection that are ignored in filters.

is_valid(data, /, *[, cast])

Utility method to check whether validate() raises an exception.

join(primary_keys[, how, maintain_order])

Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.

matches(other)

Check whether this collection semantically matches another.

member_schemas()

The schemas of all members of the collection.

members()

Information about the members of the collection.

non_ignored_members()

The names of all members of the collection that are not ignored in filters (default).

optional_members()

The names of all optional members of the collection.

read_delta(source, *[, validation])

Read all collection members from Delta Lake tables.

read_parquet(directory, *[, validation])

Read all collection members from parquet files in a directory.

required_members()

The names of all required members of the collection.

sample([num_rows, overrides, generator])

Create a random sample from the members of this collection.

scan_delta(source, *[, validation])

Lazily read all collection members from Delta Lake tables.

scan_parquet(directory, *[, validation])

Lazily read all collection members from parquet files in a directory.

serialize()

Serialize this collection to a JSON string.

sink_parquet(directory, **kwargs)

Stream the members of this collection into parquet files in a directory.

to_dict()

Return a dictionary representation of this collection.

validate(data, /, *[, cast])

Validate that a set of data frames satisfy the collection's invariants.

write_delta(target, **kwargs)

Write the members of this collection to Delta Lake tables.

write_parquet(directory, **kwargs)

Write the members of this collection to parquet files in a directory.

classmethod cast(data: Mapping[str, FrameType], /) Self[source]

Initialize a collection by casting all members into their correct schemas.

This method calls cast() on every member, thus, removing superfluous columns and casting to the correct dtypes for all input data frames.

You should typically use validate() or filter() to obtain instances of the collection as this method does not guarantee that the returned collection upholds any invariants. Nonetheless, it may be useful to use in instances where it is known that the provided data adheres to the collection’s invariants.

Args:
data: The data for all members. The dictionary must contain exactly one

entry per member with the name of the member as key.

Returns:

The initialized collection.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if

any required member of this collection is missing in the input.

Attention:

For lazy frames, casting is not performed eagerly. This prevents collecting the lazy frames’ schemas but also means that a call to collect() further down the line might fail because of the cast and/or missing columns.

collect_all() Self[source]

Collect all members of the collection.

This method collects all members in parallel for maximum efficiency. It is particularly useful when filter() is called with lazy frame inputs.

Returns:

The same collection with all members collected once.

Note:

As all collection members are required to be lazy frames, the returned collection’s members are still “lazy”. However, they are “shallow-lazy”, meaning they are obtained by calling .collect().lazy().

classmethod common_primary_keys() list[str][source]

The primary keys shared by non ignored members of the collection.

classmethod create_empty() Self[source]

Create an empty collection without any data.

This method simply calls create_empty on all member schemas, including non-optional ones.

Returns:

An instance of this collection.

classmethod filter(data: Mapping[str, FrameType], /, *, cast: bool = False) tuple[Self, dict[str, FailureInfo]][source]

Filter the members data frame by their schemas and the collection’s filters.

Args:
data: The members of the collection which ought to be filtered. The

dictionary must contain exactly one entry per member with the name of the member as key, except for optional members which may be missing. All data frames passed here will be eagerly collected within the method, regardless of whether they are a DataFrame or LazyFrame.

cast: Whether columns with a wrong data type in the member data frame are

cast to their schemas’ defined data types if possible.

Returns:

A tuple of two items:

  • An instance of the collection which contains a subset of each of the input data frames with the rows which passed member-wise validation and were not filtered out by any of the collection’s filters. While collection members are always instances of LazyFrame, the members of the returned collection are essentially eager as they are constructed by calling .lazy() on eager data frames. Just like in polars’ native filter(), the order of rows is maintained in all returned data frames.

  • A mapping from member name to a FailureInfo object which provides details on why individual rows had been removed. Optional members are only included in this dictionary if they had been provided in the input.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if

any required member of this collection is missing in the input.

ValidationError: If the columns of any of the input data frames are invalid.

This happens only if a data frame misses a column defined in its schema or a column has an invalid dtype while cast is set to False.

classmethod ignored_members() set[str][source]

The names of all members of the collection that are ignored in filters.

classmethod is_valid(data: Mapping[str, FrameType], /, *, cast: bool = False) bool[source]

Utility method to check whether validate() raises an exception.

Args:
data: The members of the collection which ought to be validated. The

dictionary must contain exactly one entry per member with the name of the member as key. The existence of all keys is checked via the dataframely mypy plugin.

cast: Whether columns with a wrong data type in the member data frame are

cast to their schemas’ defined data types if possible.

Returns:

Whether the provided members satisfy the invariants of the collection.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if

any required member of this collection is missing in the input.

join(primary_keys: LazyFrame, how: Literal['semi', 'anti'] = 'semi', maintain_order: Literal['none', 'left'] = 'none') Self[source]

Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.

Args:
primary_keys: The data frame to join on. Must contain the common primary key

columns of the collection.

how: The join strategy to use. Like in polars, semi will keep all rows

that can be found in primary_keys, anti will remove them.

maintain_order: The maintain_order option to use for the polars join.

Returns:

The collection, with members potentially reduced in length.

Raises:
ValueError: If the collection contains any member that is annotated with

ignored_in_filters=True.

Attention:

This method does not validate the resulting collection. Ensure to only use this if the resulting collection still satisfies the filters of the collection. The joins are not evaluated eagerly. Therefore, a downstream call to collect() might fail, especially if primary_keys does not contain all columns for all common primary keys.

classmethod matches(other: type[Collection]) bool[source]

Check whether this collection semantically matches another.

Args:

other: The collection to compare with.

Returns:

Whether the two collections are semantically equal.

Attention:

For custom filters, reliable comparison results are only guaranteed if the filter always returns a static polars expression. Otherwise, this function may falsely indicate a match.

classmethod member_schemas() dict[str, type[Schema]][source]

The schemas of all members of the collection.

classmethod members() dict[str, MemberInfo][source]

Information about the members of the collection.

classmethod non_ignored_members() set[str][source]

The names of all members of the collection that are not ignored in filters (default).

classmethod optional_members() set[str][source]

The names of all optional members of the collection.

classmethod read_delta(source: str | Path | deltalake.DeltaTable, *, validation: Validation = 'warn', **kwargs: Any) Self[source]

Read all collection members from Delta Lake tables.

This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.

Args:

source: The location or DeltaTable to read from. validation: The strategy for running validation when reading the data:

  • "allow"`: The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs :meth:`validate` with ``cast=True.

  • "warn"`: The method behaves similarly to ``"allow". However, it prints a warning if validation is necessary.

  • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

  • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. _Use this option carefully_.

kwargs: Additional keyword arguments passed directly to polars.read_delta().

Returns:

The initialized collection.

Raises:

ValidationRequiredError: If no collection schema can be read from the source and validation is set to "forbid". ValueError: If the provided source does not contain Delta tables for all required members. ValidationError: If the collection cannot be validated.

Attention:

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

Be aware that this method suffers from the same limitations as serialize().

classmethod read_parquet(directory: str | Path, *, validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn', **kwargs: Any) Self[source]

Read all collection members from parquet files in a directory.

This method searches for files named <member>.parquet in the provided directory for all required and optional members of the collection.

Args:
directory: The directory where the Parquet files should be read from.

Parquet files may have been written with Hive partitioning.

validation: The strategy for running validation when reading the data:

  • "allow"`: The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs :meth:`validate` with ``cast=True.

  • "warn"`: The method behaves similarly to ``"allow". However, it prints a warning if validation is necessary.

  • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

  • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. _Use this option carefully_.

kwargs: Additional keyword arguments passed directly to

polars.read_parquet().

Returns:

The initialized collection.

Raises:
ValidationRequiredError: If no collection schema can be read from the

directory and validation is set to "forbid".

ValueError: If the provided directory does not contain parquet files for

all required members.

ValidationError: If the collection cannot be validate.

Note:

This method is backward compatible with older versions of dataframely in which the schema metadata was saved to schema.json files instead of being encoded into the parquet files.

Attention:

Be aware that this method suffers from the same limitations as serialize().

classmethod required_members() set[str][source]

The names of all required members of the collection.

classmethod sample(num_rows: int | None = None, *, overrides: Sequence[Mapping[str, Any]] | None = None, generator: Generator | None = None) Self[source]

Create a random sample from the members of this collection.

Just like sampling for schemas, this method should only be used for testing. Contrary to sampling for schemas, the core difficulty when sampling related values data frames is that they must share primary keys and individual members may have a different number of rows. For this reason, overrides passed to this function must be “row-oriented” (or “sample-oriented”).

Args:
num_rows: The number of rows to sample for each member. If this is set to

None, the number of rows is inferred from the length of the overrides.

overrides: The overrides to set values in member schemas. The overrides must

be provided as a list of samples. The structure of the samples must be as follows:

{
    "<primary_key_1>": <value>,
    "<primary_key_2>": <value>,
    "<member_with_common_primary_key>": {
        "<column_1>": <value>,
        ...
    },
    "<member_with_superkey_of_primary_key>": [
        {
            "<column_1>": <value>,
            ...
        }
    ],
    ...
}

Any member/value can be left out and will be sampled automatically. Note that overrides for columns of members that are annotated with inline_for_sampling=True can be supplied on the top-level instead of in a nested dictionary.

generator: The (seeded) generator to use for sampling data. If None, a

generator with random seed is automatically created.

Returns:

A collection where all members (including optional ones) have been sampled according to the input parameters.

Attention:

In case the collection has members with a common primary key, the _preprocess_sample method must return distinct primary key values for each sample. The default implementation does this on a best-effort basis but may cause primary key violations. Hence, it is recommended to override this method and ensure that all primary key columns are set.

Raises:
ValueError: If the _preprocess_sample() method does not return all

common primary key columns for all samples.

ValidationError: If the sampled members violate any of the collection

filters. If the collection does not have filters, this error is never raised. To prevent validation errors, overwrite the _preprocess_sample() method appropriately.

classmethod scan_delta(source: str | Path | deltalake.DeltaTable, *, validation: Validation = 'warn', **kwargs: Any) Self[source]

Lazily read all collection members from Delta Lake tables.

This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.

Args:

source: The location or DeltaTable to read from. validation: The strategy for running validation when reading the data:

  • "allow"`: The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs :meth:`validate` with ``cast=True.

  • "warn"`: The method behaves similarly to ``"allow". However, it prints a warning if validation is necessary.

  • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

  • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. _Use this option carefully_.

kwargs: Additional keyword arguments passed directly to polars.scan_delta().

Returns:

The initialized collection.

Raises:

ValidationRequiredError: If no collection schema can be read from the source and validation is set to "forbid". ValueError: If the provided source does not contain Delta tables for all required members.

Note:

Due to current limitations in dataframely, this method may read the Delta table into memory if validation is "warn" or "allow" and validation is required.

Attention:

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

Be aware that this method suffers from the same limitations as serialize().

classmethod scan_parquet(directory: str | Path, *, validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn', **kwargs: Any) Self[source]

Lazily read all collection members from parquet files in a directory.

This method searches for files named <member>.parquet in the provided directory for all required and optional members of the collection.

Args:
directory: The directory where the Parquet files should be read from.

Parquet files may have been written with Hive partitioning.

validation: The strategy for running validation when reading the data:

  • "allow"`: The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs :meth:`validate` with ``cast=True.

  • "warn"`: The method behaves similarly to ``"allow". However, it prints a warning if validation is necessary.

  • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

  • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. _Use this option carefully_.

kwargs: Additional keyword arguments passed directly to

polars.scan_parquet() for all members.

Returns:

The initialized collection.

Raises:
ValidationRequiredError: If no collection schema can be read from the

directory and validation is set to "forbid".

ValueError: If the provided directory does not contain parquet files for

all required members.

Note:

Due to current limitations in dataframely, this method actually reads the parquet file into memory if "validation" is "warn" or "allow" and validation is required.

Note: This method is backward compatible with older versions of dataframely

in which the schema metadata was saved to schema.json files instead of being encoded into the parquet files.

Attention:

Be aware that this method suffers from the same limitations as serialize().

classmethod serialize() str[source]

Serialize this collection to a JSON string.

This method does NOT serialize any data frames, but only the _structure_ of the collection, similar to Schema.serialize().

Returns:

The serialized collection.

Note:

Serialization within dataframely itself will remain backwards-compatible at least within a major version. Until further notice, it will also be backwards-compatible across major versions.

Attention:

Serialization of polars expressions and lazy frames is not guaranteed to be stable across versions of polars. This affects collections with filters or members that define custom rules or columns with custom checks: a collection serialized with one version of polars may not be deserializable with another version of polars.

Attention:

This functionality is considered unstable. It may be changed at any time without it being considered a breaking change.

Raises:
TypeError: If a column of any member contains metadata that is not

JSON-serializable.

ValueError: If a column of any member is not a “native” dataframely column

type but a custom subclass.

sink_parquet(directory: str | Path, **kwargs: Any) None[source]

Stream the members of this collection into parquet files in a directory.

This method writes one parquet file per member into the provided directory. Each parquet file is named <member>.parquet. No file is written for optional members which are not provided in the current collection.

Args:
directory: The directory where the Parquet files should be written to. If

the directory does not exist, it is created automatically, including all of its parents.

kwargs: Additional keyword arguments passed directly to

polars.sink_parquet() of all members. metadata may only be provided if it is a dictionary.

Attention:

This method suffers from the same limitations as Schema.serialize().

to_dict() dict[str, LazyFrame][source]

Return a dictionary representation of this collection.

classmethod validate(data: Mapping[str, FrameType], /, *, cast: bool = False) Self[source]

Validate that a set of data frames satisfy the collection’s invariants.

Args:
data: The members of the collection which ought to be validated. The

dictionary must contain exactly one entry per member with the name of the member as key.

cast: Whether columns with a wrong data type in the member data frame are

cast to their schemas’ defined data types if possible.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if

any required member of this collection is missing in the input.

ValidationError: If any of the input data frames does not satisfy its schema

definition or the filters on this collection result in the removal of at least one row across any of the input data frames.

Returns:

An instance of the collection. All members of the collection are guaranteed to be valid with respect to their respective schemas and the filters on this collection did not remove rows from any member. The input order of each member is maintained.

write_delta(target: str | Path | deltalake.DeltaTable, **kwargs: Any) None[source]

Write the members of this collection to Delta Lake tables.

This method writes each member to a Delta Lake table at the provided target location. The target can be a path, URI, or an existing DeltaTable object. No table is written for optional members which are not provided in the current collection.

Args:
target: The location or DeltaTable where the data should be written.

If the location does not exist, it is created automatically, including all of its parents.

kwargs: Additional keyword arguments passed directly to polars.write_delta().

Attention:

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

This method suffers from the same limitations as Schema.serialize().

write_parquet(directory: str | Path, **kwargs: Any) None[source]

Write the members of this collection to parquet files in a directory.

This method writes one parquet file per member into the provided directory. Each parquet file is named <member>.parquet. No file is written for optional members which are not provided in the current collection.

Args:
directory: The directory where the Parquet files should be written to. If

the directory does not exist, it is created automatically, including all of its parents.

kwargs: Additional keyword arguments passed directly to

polars.write_parquet() of all members. metadata may only be provided if it is a dictionary.

Attention:

This method suffers from the same limitations as Schema.serialize().

dataframely.collection.deserialize_collection(data: str) type[Collection][source]

Deserialize a collection from a JSON string.

This method allows to dynamically load a collection from its serialization, without having to know the collection to load in advance.

Args:

data: The JSON string created via Collection.serialize().

Returns:

The collection loaded from the JSON data.

Raises:

ValueError: If the schema format version is not supported.

Attention:

The returned collection cannot be used to create instances of the collection as filters cannot be correctly recovered from the serialized format as of polars 1.31. Thus, you should only use static information from the returned collection.

Attention:

This functionality is considered unstable. It may be changed at any time without it being considered a breaking change.

See also:

Collection.serialize() for additional information on serialization.

dataframely.collection.read_parquet_metadata_collection(source: str | Path | IO[bytes] | bytes) type[Collection] | None[source]

Read a dataframely Collection type from the metadata of a parquet file.

Args:

source: Path to a parquet file or a file-like object that contains the metadata.

Returns:

The collection that was serialized to the metadata. None if no collection metadata is found or the deserialization fails.