SDK Reference

Data Connector

class DataClient(token: str, base_url: str, session: Session | None = None)[source]

Bases: object

Client to interact with the Data Platform API.

headers

headers used in the request session

create_dataset(repository_id: str, dataset: DatasetCreate) DataDataset[source]

Create a new dataset in a repository.

Parameters:
  • repository_id – Repository ID

  • dataset – :DatasetCreate object

Returns:

DataDataset new object

create_repository(repository: DataRepositoryCreate) DataRepository[source]

Create a new repository.

Parameters:

repository – DataRepositoryCreate object

Returns:

DataRepository new object

create_stage(stage: DataStageCreate) DataStage[source]

Create a new stage.

Parameters:

stage – DataStageCreate object

Returns:

DataStage new object

delete_dataset(repository_id: str, dataset_id: str) None[source]

Delete a dataset by ID.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

get_dataset(repository_id: str, dataset_id: str) DataDataset[source]

Get a dataset by ID.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

Returns:

DataDataset object

get_file_from_stage(stage_id: str, file_id: str) BytesIO[source]

Get a file from a stage.

Parameters:
  • stage_id – Stage ID

  • file_id – File ID

Returns:

File bytes

get_repository(repository_id: str) DataRepository[source]

Get a repository by ID.

Parameters:

repository_id – Repository ID

Returns:

DataRepository object

get_stage(stage_id: str) DataStage[source]

Get a stage by ID.

Parameters:

stage_id – Stage ID

Returns:

DataStage object

list_datasets(repository_id: str, page: int = 0, size: int = 20) list[DataDataset][source]

List all the datasets in a repository.

Parameters:
  • repository_id – Repository ID

  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataDataset from a given repository

list_files_in_stage(stage_id: str, page: int = 0, size: int = 20) list[DataFile][source]

List all the files in a stage.

Parameters:
  • stage_id – Stage ID

  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataFile objects

list_repositories(page: int = 0, size: int = 20) list[DataRepository][source]

List all the repositories.

Parameters:
  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataRepository objects

list_stages(page: int = 0, size: int = 20) list[DataStage][source]

List all the stages.

Parameters:
  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataStage objects

stream_dataset(repository_id: str, dataset_id: str) Iterator[Any][source]

Stream the data points of a dataset.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

Returns:

class Iterator of datapoints(Any)

upload_file_to_stage(stage_id: str, file: DataFileCreate) DataFile[source]

Upload a file to a stage.

Parameters:
  • stage_id – Stage ID

  • file – DataFileCreate object

Returns:

DataFile new object

class DataDataset(*, repositoryId: str, datasetId: str, name: str | None = None, labels: list[str] | None = None, totalDatapoints: int, metadata: dict[str, Any] | None = None, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Dataset model.

Attributes: repository_id: Repository ID that identifies the repository(group of datasets) dataset_id: Dataset ID that identifies the dataset name: Name of the dataset labels: List of labels of the dataset total_datapoints: Total number of units in the dataset metadata: Metadata of the dataset created_at: Datetime when the dataset was created updated_at: Datetime when the dataset was updated

exception DataExternalServiceUnavailable(*args: object)[source]

Bases: DataError

Exception raised when an external service is unavailable.

DEFAULT_MESSAGE = 'External service unavailable: The external service is unavailable. '
add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class DataFile(*, fileId: str, stageId: str, name: str, createdAt: datetime, updatedAt: datetime, mediaType: str, size: int)[source]

Bases: BaseDataModel

class DataFileCreate(*, sourceData: BufferedReader | bytes, name: str)[source]

Bases: BaseDataModel

exception DataForbiddenError(*args: object)[source]

Bases: DataError

Exception raised when a forbidden error occurs.

DEFAULT_MESSAGE = 'Forbidden error: Client does not have permission to access the resource. '
add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception DataInternalError(*args: object)[source]

Bases: DataError

Exception raised when an internal error occurs.

DEFAULT_MESSAGE = 'Internal error: An unexpected error occurred. '
add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception DataInvalidInput(*args: object)[source]

Bases: DataError

Exception raised when the input is invalid.

DEFAULT_MESSAGE = 'Invalid input: The input provided is invalid. '
add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class DataRepository(*, repositoryId: str, name: str, mutable: bool, mediaType: Annotated[str, AfterValidator(func=media_type_validator)], modality: Modality, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Data Repository model.

Attributes: repository_id: Repository ID that identifies the repository(group of datasets) name: Name of the repository mutable: Indicates if the datasets in the repository are mutable or not media_type: Media type of the data: application/json, application/csv, etc. modality: Modality of the data: image, text, etc. created_at: Datetime when the repository was created updated_at: Datetime when the repository was updated

class DataRepositoryCreate(*, name: str, mediaType: Annotated[str, AfterValidator(func=media_type_validator)], modality: Modality)[source]

Bases: BaseDataModel

Data Repository creation model.

Attributes: name: Name of the repository media_type: Media type of the data: application/json, application/csv, etc. modality: Modality of the data: image, text, etc.

exception DataResourceNotFound(*args: object)[source]

Bases: DataError

Exception raised when a resource is not found.

DEFAULT_MESSAGE = 'Resource not found: The requested resource was not found. '
add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class DataStage(*, stageId: str, name: str, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Stage model.

Attributes: stage_id: Stage ID that identifies the stage name: Name of the stage created_at: Datetime when the stage was created updated_at: Datetime when the stage was updated

class DataStageCreate(*, name: str)[source]

Bases: BaseDataModel

Stage creation model.

Attributes: name: Name of the stage

class DatasetCreate(*, sourceData: BufferedReader | bytes, name: str | None = None, labels: list[str], totalDatapoints: int, metadata: dict[str, Any] | None = None)[source]

Bases: BaseDataModel

Dataset creation model.

Attributes: source_data: Source data of the dataset in bytes(file like object) name: Name of the dataset labels: List of labels of the dataset total_datapoints: Total number of units in the dataset metadata: Metadata of the dataset

Document Index Connector

class AsyncDocumentIndexClient(token: str, base_url: str)[source]

Bases: object

Asynchronous client for the Document Index allowing handling documents and search.

Document Index is a tool for managing collections of documents, enabling operations such as creation, deletion, listing, and searching. Documents can be stored either in the cloud or in a local deployment.

Example

>>> import os
>>> import asyncio
>>> from pharia_data_sdk.connectors import (
...     CollectionPath,
...     DocumentContents,
...     AsyncDocumentIndexClient,
...     DocumentPath,
...     SearchQuery,
... )
>>> async def main():
...     async with AsyncDocumentIndexClient(os.getenv("AA_TOKEN"), os.getenv("AA_TOKEN")) as document_index:
...         collection_path = CollectionPath(
...             namespace="my-namespace", collection="previously-created-collection"
...         )
...         try:
...             search_result = await document_index.search(
...                 collection_path=collection_path,
...                 index_name="asymmetric",
...                 search_query=SearchQuery(
...                     query="What is the capital of Germany", max_results=4, min_score=0.5
...                 ),
...             )
...             print(search_result)
...         except Exception:
...             # some error handling here
...             pass
>>> asyncio.run(main())
async add_document(document_path: DocumentPath, contents: DocumentContents) None[source]

Add a document to a collection.

Note

If a document with the same document_path exists, it will be updated with the new contents.

Parameters:
  • document_path – Consists of collection_path and name of document to be created.

  • contents – Actual content of the document. Currently only supports text.

async assign_filter_index_to_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Assign an existing filter index to an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to assign the filter index to.

  • filter_index_name – Name of the filter index.

async assign_index_to_collection(collection_path: CollectionPath, index_name: str) None[source]

Assign an index to a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

async chunks(document_path: DocumentPath, index_name: str) Sequence[DocumentChunk][source]

Retrieve all chunks of an indexed document.

If the document is still indexing, a ResourceNotFound error is raised.

Parameters:
  • document_path – Path to the document.

  • index_name – Name of the index to retrieve chunks from.

Returns:

List of all chunks of the indexed document.

async create_collection(collection_path: CollectionPath) None[source]

Creates a collection at the path.

Note

Collection’s name must be unique within a namespace.

Parameters:

collection_path – Path to the collection of interest.

async create_filter_index_in_namespace(namespace: str, filter_index_name: str, field_name: str, field_type: Literal['string', 'integer', 'float', 'boolean', 'datetime']) None[source]

Create a filter index in a specified namespace.

Parameters:
  • namespace – The namespace in which to create the filter index.

  • filter_index_name – The name of the filter index to create.

  • field_name – The name of the field to index.

  • field_type – The type of the field to index.

Returns:

None

async create_index(index_path: IndexPath, index_configuration: IndexConfiguration) None[source]

Creates an index in a namespace.

Parameters:
  • index_path – Path to the index.

  • index_configuration – Configuration of the index to be created.

async delete_collection(collection_path: CollectionPath) None[source]

Deletes the collection at the path.

Parameters:

collection_path – Path to the collection of interest.

async delete_document(document_path: DocumentPath) None[source]

Delete a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be deleted.

async delete_filter_index_from_namespace(namespace: str, filter_index_name: str) None[source]

Delete a filter index from a namespace.

Parameters:
  • namespace – The namespace to delete the filter index from.

  • filter_index_name – The name of the filter index to delete.

async delete_index(index_path: IndexPath) None[source]

Delete an index in a namespace.

Parameters:

index_path – Path to the index.

async delete_index_from_collection(collection_path: CollectionPath, index_name: str) None[source]

Delete an index from a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

async document(document_path: DocumentPath) DocumentContents[source]

Retrieve a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be retrieved.

Returns:

Content of the retrieved document.

async documents(collection_path: CollectionPath, filter_query_params: DocumentFilterQueryParams | None = None) Sequence[DocumentInfo][source]

List all documents within a collection.

Note

Does not return each document’s content.

Parameters:
  • collection_path – Path to the collection of interest.

  • filter_query_params – Query parameters to filter the results.

Returns:

Overview of all documents within the collection.

async index_configuration(index_path: IndexPath) IndexConfiguration[source]

Retrieve the configuration of an index in a namespace given its name.

Parameters:

index_path – Path to the index.

Returns:

Configuration of the index.

async list_assigned_filter_index_names(collection_path: CollectionPath, index_name: str) Sequence[str][source]

List all filter-indexes assigned to a search index in a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Search index to check.

Returns:

List of all filter-indexes that are assigned to the collection.

async list_assigned_index_names(collection_path: CollectionPath) Sequence[str][source]

List all indexes assigned to a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

List of all indexes that are assigned to the collection.

async list_collections(namespace: str) Sequence[CollectionPath][source]

Lists all collections within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all CollectionPath instances in the given namespace.

async list_filter_indexes_in_namespace(namespace: str) Sequence[str][source]

List all filter indexes in a namespace.

Parameters:

namespace – The namespace to list filter indexes in.

Returns:

List of all filter indexes in the namespace.

async list_indexes(namespace: str) Sequence[IndexPath][source]

Lists all indexes within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all IndexPath instances in the given namespace.

async list_namespaces() Sequence[str][source]

Lists all available namespaces.

Returns:

List of all available namespaces.

async progress(collection_path: CollectionPath) int[source]

Get the number of unembedded documents in a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

The number of unembedded documents in a collection.

async search(collection_path: CollectionPath, index_name: str, search_query: SearchQuery) Sequence[DocumentSearchResult][source]

Search through a collection with a search_query.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to search with.

  • search_query – The query to search with.

Returns:

Result of the search operation. Will be empty if nothing was retrieved.

async unassign_filter_index_from_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Unassign a filter index from an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to unassign the filter index from.

  • filter_index_name – Name of the filter index.

class CollectionPath(*, namespace: str, collection: str)[source]

Bases: BaseModel

Path to a collection.

Parameters:
  • namespace – Holds collections.

  • collection – Holds documents. Unique within a namespace.

exception ConstraintViolation(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the request cannot be processed as it would lead to an inconsistent state.

class DocumentIndexClient(token: str, base_url: str)[source]

Bases: object

Client for the Document Index allowing handling documents and search.

Document Index is a tool for managing collections of documents, enabling operations such as creation, deletion, listing, and searching. Documents can be stored either in the cloud or in a local deployment.

Parameters:
  • token – A valid token for the document index API.

  • base_url – The url of the document index API.

add_document(document_path: DocumentPath, contents: DocumentContents) None[source]

Add a document to a collection.

Note

If a document with the same document_path exists, it will be updated with the new contents.

Parameters:
  • document_path – Consists of collection_path and name of document to be created.

  • contents – Actual content of the document. Currently only supports text.

assign_filter_index_to_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Assign an existing filter index to an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to assign the filter index to.

  • filter_index_name – Name of the filter index.

assign_index_to_collection(collection_path: CollectionPath, index_name: str) None[source]

Assign an index to a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

chunks(document_path: DocumentPath, index_name: str) Sequence[DocumentChunk][source]

Retrieve all chunks of an indexed document.

If the document is still indexing, a ResourceNotFound error is raised.

Parameters:
  • document_path – Path to the document.

  • index_name – Name of the index to retrieve chunks from.

Returns:

List of all chunks of the indexed document.

create_collection(collection_path: CollectionPath) None[source]

Creates a collection at the path.

Note

Collection’s name must be unique within a namespace.

Parameters:

collection_path – Path to the collection of interest.

create_filter_index_in_namespace(namespace: str, filter_index_name: str, field_name: str, field_type: Literal['string', 'integer', 'float', 'boolean', 'datetime']) None[source]

Create a filter index in a specified namespace.

Parameters:
  • namespace – The namespace in which to create the filter index.

  • filter_index_name – The name of the filter index to create.

  • field_name – The name of the field to index.

  • field_type – The type of the field to index.

create_index(index_path: IndexPath, index_configuration: IndexConfiguration) None[source]

Creates an index in a namespace.

Parameters:
  • index_path – Path to the index.

  • index_configuration – Configuration of the index to be created.

delete_collection(collection_path: CollectionPath) None[source]

Deletes the collection at the path.

Parameters:

collection_path – Path to the collection of interest.

delete_document(document_path: DocumentPath) None[source]

Delete a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be deleted.

delete_filter_index_from_namespace(namespace: str, filter_index_name: str) None[source]

Delete a filter index from a namespace.

Parameters:
  • namespace – The namespace to delete the filter index from.

  • filter_index_name – The name of the filter index to delete.

delete_index(index_path: IndexPath) None[source]

Delete an index in a namespace.

Parameters:

index_path – Path to the index.

delete_index_from_collection(collection_path: CollectionPath, index_name: str) None[source]

Delete an index from a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

document(document_path: DocumentPath) DocumentContents[source]

Retrieve a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be retrieved.

Returns:

Content of the retrieved document.

documents(collection_path: CollectionPath, filter_query_params: DocumentFilterQueryParams | None = None) Sequence[DocumentInfo][source]

Lists the information of documents in a collection. This includes the document name, creation timestamp and version number.

Note

This does not return document contents.

Parameters:
  • collection_path – Path to the collection of interest.

  • filter_query_params – Query parameters to filter the results.

Returns:

Information of documents in the collection.

index_configuration(index_path: IndexPath) IndexConfiguration[source]

Retrieve the configuration of an index in a namespace given its name.

Parameters:

index_path – Path to the index.

Returns:

Configuration of the index.

list_assigned_filter_index_names(collection_path: CollectionPath, index_name: str) Sequence[str][source]

List all filter-indexes assigned to a search index in a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Search index to check.

Returns:

List of all filter-indexes that are assigned to the collection.

list_assigned_index_names(collection_path: CollectionPath) Sequence[str][source]

List all indexes assigned to a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

List of all indexes that are assigned to the collection.

list_collections(namespace: str) Sequence[CollectionPath][source]

Lists all collections within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all CollectionPath instances in the given namespace.

list_filter_indexes_in_namespace(namespace: str) Sequence[str][source]

List all filter indexes in a namespace.

Parameters:

namespace – The namespace to list filter indexes in.

Returns:

List of all filter indexes in the namespace.

list_indexes(namespace: str) Sequence[IndexPath][source]

Lists all indexes within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all IndexPath instances in the given namespace.

list_namespaces() Sequence[str][source]

Lists all available namespaces.

Returns:

List of all available namespaces.

progress(collection_path: CollectionPath) int[source]

Get the number of unembedded documents in a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

The number of unembedded documents in a collection.

search(collection_path: CollectionPath, index_name: str, search_query: SearchQuery) Sequence[DocumentSearchResult][source]

Search through a collection with a search_query.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to search with.

  • search_query – The query to search with.

Returns:

Result of the search operation. Will be empty if nothing was retrieved.

unassign_filter_index_from_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Unassign a filter index from an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to unassign the filter index from.

  • filter_index_name – Name of the filter index.

exception DocumentIndexError(message: str, status_code: HTTPStatus)[source]

Bases: RuntimeError

Raised in case of any DocumentIndexClient-related errors.

message

The error message as returned by the Document Index.

status_code

The http error code.

class DocumentInfo(*, document_path: DocumentPath, created: datetime, version: int)[source]

Bases: BaseModel

Information about a document.

Parameters:
  • document_path – Path to the document. The path uniquely identifies the document among all managed documents.

  • created – When this version of the document was created. Equivalent to when it was last updated.

  • version – The version of the document, i.e., how many times the document was updated.

classmethod from_list_documents_response(list_documents_response: Mapping[str, Any]) DocumentInfo[source]
class DocumentPath(*, collection_path: CollectionPath, document_name: str)[source]

Bases: BaseModel

Path to a document.

Parameters:
  • collection_path – Path to a collection.

  • document_name – Points to a document. Unique within a collection.

encoded_document_name() str[source]
classmethod from_json(document_path_json: Mapping[str, str]) DocumentPath[source]
classmethod from_slash_separated_str(path: str) DocumentPath[source]
to_slash_separated_str() str[source]
class DocumentSearchResult(*, document_path: DocumentPath, section: str, score: float, chunk_position: DocumentTextPosition)[source]

Bases: BaseModel

Result of a search query for one individual section.

Parameters:
  • document_path – Path to the document that the section originates from.

  • section – Actual section of the document that was found as a match to the query.

  • score – Search score of the found section. Will be between 0 and 1. Higher scores correspond to higher matches. The score depends on the index configuration, e.g. the score of a section differs for hybrid and non-hybrid indexes. For searches on hybrid indexes, the score can exceed the min_score of the query as the min_score only applies to the similarity score.

exception ExternalServiceUnavailable(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case external service is unavailable when the request is executed.

class FilterField(*, field_name: ~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=1000, pattern=^[\w-]+(\.\d{0,5})?[\w-]*$)], field_value: str | int | float | bool | ~datetime.datetime, criteria: ~pharia_data_sdk.connectors.document_index.document_index.FilterOps)[source]

Bases: BaseModel

Represents a field to filter on in the DocumentIndex metadata.

classmethod validate_and_convert_datetime(v: str | int | float | bool | datetime) str | int | float | bool[source]

Validate field_value and convert datetime to RFC3339 format with Z suffix.

Parameters:

v – The value to be validated and converted. # noqa: DAR102: + cls

Returns:

The validated and converted value.

class FilterOps(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration of possible filter operations.

class Filters(*, filter_type: Literal['with', 'without', 'with_one_of'], fields: list[FilterField])[source]

Bases: BaseModel

Represents a set of filters to apply to a search query.

class IndexConfiguration(*, chunk_overlap: Annotated[int, Ge(ge=0)] = 0, chunk_size: Annotated[int, Gt(gt=0), Le(le=2046)], hybrid_index: Literal['bm25'] | None = None, embedding: SemanticEmbed | InstructableEmbed)[source]

Bases: BaseModel

Configuration of an index.

Parameters:
  • chunk_overlap – The maximum number of tokens of overlap between consecutive chunks. Must be less than chunk_size.

  • chunk_size – The maximum size of the chunks in tokens to be used for the index.

  • hybrid_index – If set to “bm25”, combine vector search and keyword search (bm25) results.

  • embedding – Configuration for the embedding of chunks.

validate_chunk_overlap() Self[source]
class IndexPath(*, namespace: str, index: str)[source]

Bases: BaseModel

Path to an index.

Parameters:
  • namespace – The namespace to which this index belongs.

  • index – The name of the index.

class InstructableEmbed(*, strategy: Literal['instructable_embed'] = 'instructable_embed', model_name: str, query_instruction: str = '', document_instruction: str = '')[source]

Bases: BaseModel

Instructable embedding configuration.

Parameters:
  • model_name – Name of the model to use.

  • query_instruction – Instruction to apply when embedding queries.

  • document_instruction – Instruction to apply when embedding documents.

exception InternalError(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case of unexpected errors.

exception InvalidInput(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the user-input could not be processed as it violates pre-conditions.

exception ResourceNotFound(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when a resource like a namespace or a document cannot be found.

Note that this can also mean that the user executing the request does not have permission to access the resource.

class SearchQuery(*, query: str, max_results: Annotated[int, Ge(ge=0)] = 1, min_score: Annotated[float, Ge(ge=-1.0), Le(le=1.0)] = 0.0, filters: list[Filters] | None = None)[source]

Bases: BaseModel

Query to search through a collection with.

Parameters:
  • query – Actual text to be searched with.

  • max_results – Max number of search results to be retrieved by the query. Must be larger than 0.

  • min_score – Filter out results with a similarity score below this value. Must be between -1 and 1. For searches on hybrid indexes, the Document Index applies the min_score to the semantic results before fusion of result sets. As fusion re-scores results, returned scores may exceed this value.

class SemanticEmbed(*, strategy: Literal['semantic_embed'] = 'semantic_embed', model_name: str, representation: Literal['symmetric', 'asymmetric'])[source]

Bases: BaseModel

Semantic embedding configuration.

Parameters:
  • model_name – Name of the model to use.

  • representation – The embedding representation to use: “symmetric” or “asymmetric”. Use “symmetric” when the queries and documents are the same, e.g., for classification tasks. Use “asymmetric” when the queries and documents are different, e.g., for search tasks.

Retrievers

class AsyncBaseRetriever[source]

Bases: ABC, Generic[ID]

General interface for any asynchronous retriever.

Asynchronous retrievers are used to find texts given a user query. Each Retriever implementation owns its own logic for retrieval. For comparison purposes, we assume scores in the SearchResult instances to be between 0 and 1.

abstract async get_full_document(id: ID) Document | None[source]
abstract async get_relevant_documents_with_scores(query: str) Sequence[SearchResult][source]
class AsyncDocumentIndexRetriever(document_index: AsyncDocumentIndexClient, index_name: str, namespace: str, collection: str, k: int = 1, threshold: float = 0.0)[source]

Bases: AsyncBaseRetriever[DocumentPath]

async get_full_document(id: DocumentPath) Document[source]
async get_relevant_documents_with_scores(query: str, filters: list[Filters] | None = None) Sequence[SearchResult[DocumentPath]][source]
class BaseRetriever[source]

Bases: ABC, Generic[ID]

General interface for any retriever.

Retrievers are used to find texts given a user query. Each Retriever implementation owns its own logic for retrieval. For comparison purposes, we assume scores in the SearchResult instances to be between 0 and 1.

abstract get_full_document(id: ID) Document | None[source]
abstract get_relevant_documents_with_scores(query: str) Sequence[SearchResult][source]
class Document(*, text: str, metadata: Any = None)[source]

Bases: BaseModel

A document.

text

The document’s text.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

class DocumentChunk(*, text: str, start: int, end: int, metadata: Any = None)[source]

Bases: BaseModel

Part of a Document, specifically for retrieval use cases.

text

Chunk of the document that matched the search query.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

start

Start index of the chunk within the document

Type:

int

end

End index of the chunk within the document

Type:

int

class DocumentIndexRetriever(document_index: DocumentIndexClient, index_name: str, namespace: str, collection: str, k: int = 1, threshold: float = 0.0)[source]

Bases: BaseRetriever[DocumentPath]

Search through documents within collections in the DocumentIndexClient.

This retriever lets you search for relevant documents in the given Document Index collection.

Example: >>> import os >>> from pharia_data_sdk.connectors import DocumentIndexClient, DocumentIndexRetriever >>> document_index = DocumentIndexClient(os.getenv(“AA_TOKEN”)) >>> retriever = DocumentIndexRetriever(document_index, “asymmetric”, “aleph-alpha”, “wikipedia-de”, 3) >>> documents = retriever.get_relevant_documents_with_scores(“Who invented the airplane?”)

get_full_document(id: DocumentPath) Document[source]
get_relevant_documents_with_scores(query: str, filters: list[Filters] | None = None) Sequence[SearchResult[DocumentPath]][source]
class HybridQdrantInMemoryRetriever(documents: Sequence[Document], k: int, client: AlephAlphaClientProtocol | None = None, threshold: float = 0.0, retriever_type: RetrieverType = RetrieverType.ASYMMETRIC, distance_metric: Distance = Distance.COSINE, sparse_model_name: str = 'Qdrant/bm25', max_workers: int = 10)[source]

Bases: QdrantInMemoryRetriever

Search through documents stored in memory using hybrid (keyword + semantic) search.

This retriever uses a [Qdrant](https://github.com/qdrant/qdrant)-in-Memory vector store instance to store documents and their asymmetric embeddings. When run, the given query is embedded using both a dense and sparse embedding model and scored against the documents in the collection to find the most relevant documents. Finally, the retrievals are fused using the Reciprocal Rank Fusion algorithm.

Parameters:
  • documents – The sequence of documents to be made searchable.

  • k – The (top) number of documents to be returned by search.

  • client – Aleph Alpha client instance for running model related API calls. Defaults to LimitedConcurrencyClient.from_env().

  • threshold – The minimum value of the fusion rank score (combined cosine similarity and keyword similarity). Defaults to 0.0.

  • retriever_type – The type of retriever to be instantiated. Should be ASYMMETRIC for most query-document retrieveal use cases, SYMMETRIC is optimized for similar document retrieval. Defaults to ASYMMETRIC.

  • distance_metric – The distance metric to be used for vector comparison. Defaults to Distance.COSINE.

  • sparse_model_name – The name of the sparse embedding model from fastemebed to be used. Defaults to “Qdrant/bm25”.

  • max_workers – The maximum number of workers to use for concurrent processing. Defaults to 10.

Example

>>> from pharia_inference_sdk.connectors import LimitedConcurrencyClient, Document, HybridQdrantInMemoryRetriever
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [Document(text=t) for t in ["I do not like rain.", "Summer is warm.", "We are so back."]]
>>> retriever = HybridQdrantInMemoryRetriever(documents, 5, client=client)
>>> query = "Do you like summer?"
>>> documents = retriever.get_relevant_documents_with_scores(query)
get_filtered_documents_with_scores(query: str, filter: Filter | None) Sequence[SearchResult[int]][source]

Retrieves documents that match the given query and filter conditions, using hybrid search.

This method performs a hybrid search by embedding the query into dense and sparse vectors. It then executes search requests for both vector types and combines the results using the Reciprocal Rank Fusion algorithm.

Parameters:
  • query – The text query to search for.

  • filter – If not None, a filter to apply to the search results.

Returns:

All documents that correspond to the query and pass the filter, sorted by their reciprocal rank fusion score.

get_full_document(id: int) Document | None
get_relevant_documents_with_scores(query: str) Sequence[SearchResult[int]][source]

Search for relevant documents given a query using hybrid search (dense + sparse retrieval).

This method performs a hybrid search by embedding the query into dense and sparse vectors. It then executes search requests for both vector types and combines the results using the Reciprocal Rank Fusion algorithm.

Parameters:

query – The text to be searched with.

Returns:

All documents that correspond to the query, sorted by their reciprocal rank fusion score.

class QdrantInMemoryRetriever(documents: Sequence[Document], k: int, client: AlephAlphaClientProtocol | None = None, threshold: float = 0.5, retriever_type: RetrieverType = RetrieverType.ASYMMETRIC, distance_metric: Distance = Distance.COSINE)[source]

Bases: BaseRetriever[int]

Search through documents stored in memory using semantic search.

This retriever uses a [Qdrant](https://github.com/qdrant/qdrant)-in-Memory vector store instance to store documents and their asymmetric embeddings. When run, the given query is embedded and scored against the document embeddings to retrieve the k-most similar matches by cosine similarity.

Parameters:
  • documents – The sequence of documents to be made searchable.

  • k – The (top) number of documents to be returned by search.

  • client – Aleph Alpha client instance for running model related API calls.

  • threshold – The mimumum value of cosine similarity between the query vector and the document vector.

  • retriever_type – The type of retriever to be instantiated. Should be ASYMMETRIC for most query-document retrieveal use cases, SYMMETRIC is optimized for similar document retrieval.

  • distance_metric – The distance metric to be used for vector comparison.

Example

>>> from pharia_data_sdk.connectors import LimitedConcurrencyClient, Document, QdrantInMemoryRetriever
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [Document(text=t) for t in ["I do not like rain.", "Summer is warm.", "We are so back."]]
>>> retriever = QdrantInMemoryRetriever(documents, 5, client=client)
>>> query = "Do you like summer?"
>>> documents = retriever.get_relevant_documents_with_scores(query)
get_filtered_documents_with_scores(query: str, filter: Filter) Sequence[SearchResult[int]][source]

Specific method for InMemoryRetriever to support filtering search results.

Parameters:
  • query – The text to be searched with.

  • filter – Conditions to filter by.

Returns:

All documents that correspond to the query and pass the filter.

get_full_document(id: int) Document | None[source]
get_relevant_documents_with_scores(query: str) Sequence[SearchResult[int]][source]
class RetrieverType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Specify the type of retriever to instantiate.

ASYMMETRIC

Query is embedded as Query and each document as Document.

SYMMETRIC

Both query and documents will be embedded as Symmetric.

class SearchResult(*, id: ID, score: float, document_chunk: DocumentChunk)[source]

Bases: BaseModel, Generic[ID]

Contains a text alongside its search score.

id

Unique identifier of the document

Type:

pharia_data_sdk.connectors.retrievers.base_retriever.ID

score

The similarity score between the text and the query that was searched with. Will be between 0 and 1, where 0 means no similarity and 1 perfect similarity.

Type:

float

document_chunk

The document chunk found by search.

Type:

pharia_data_sdk.connectors.retrievers.base_retriever.DocumentChunk