Skip to content

Datapool

The Datapool functionality can be used to manage a large-volume queue of items that need to be processed and monitored granularly.

In this section you will find more details on how to interact with this BotCity Orchestrator resource through the SDK.

BotCity Orchestrator

You can use the Datapool functionality directly on the BotCity Orchestrator platform.

See more at:

Create a Datapool

You can create the structure of a new Datapool and configure it via SDK.

You need the following information:

  • Label: Unique identifier of the Datapool.
  • Name: Friendly name for Datapool recognition.
  • Reprocessing: The maximum number of reprocessing attempts in case of an item error.
  • Abort on error: The maximum number of consecutive errors before deactivating the queue.
  • Processing time: The expected time in minutes for processing an item.
  • Schema: List of fields that make up the Datapool, defined with the SchemaField class:
    • label: Field identifier.
    • type: Data type, defined by FieldType: TEXT, INTEGER or DOUBLE.
    • unique_id: Defines it as a unique ID: True or False.
    • display_value: Defines it as visible to users: True or False.

Create via Orchestrator interface

Although the Maestro SDK allows creating a Datapool via code, we recommend using the Orchestrator web interface to configure the settings in a more intuitive and detailed way for each required parameter.

See more at:

# Schema example with fields: 'id', 'name' and 'price'

product_id = SchemaField(
    label="id",
    type=FieldType.TEXT,
    unique_id=True,
    display_value=True
)

product_name = SchemaField(
    label="name",
    type=FieldType.TEXT,
    unique_id=False,
    display_value=True
)

product_price = SchemaField(
    label="price",
    type=FieldType.DOUBLE,
    unique_id=False,
    display_value=False
)

# List with the fields
schema = [product_id, product_name, product_price]

# Creating the Datapool object
datapool = DataPool(
    label="ProductsData",
    name="ProductsData",
    max_auto_retry=2,
    max_errors_before_inactive=5,
    item_max_processing_time=3,
    schema=schema
)

# Creating the Datapool structure in the Orchestrator
maestro.create_datapool(datapool)
// Not yet implemented

Datapool operations

You can perform some operations with the created Datapool, including:

  • Get: Returns the Datapool reference.
    • Label: Datapool identification parameter.
  • Is active: Checks whether the retrieved Datapool is active: True or False.
  • Is empty: Checks whether there are no pending items in the retrieved Datapool: True or False.
  • Has next: Checks whether there is a next pending item in the queue in the retrieved Datapool: True or False.
  • Activate: Activates the retrieved Datapool.
  • Deactivate: Deactivates the retrieved Datapool.

Usage examples:

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Checking if the Datapool is active
print(datapool.is_active())

# Checking if the Datapool has no pending items
print(datapool.is_empty())

# Checking if there is a next pending item
print(datapool.has_next())

# Marking the Datapool as active
datapool.activate()

# Marking the Datapool as inactive
datapool.deactivate()
// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Checking if the Datapool is active
Console.WriteLine(await datapool.IsActiveAsync());

// Checking if the Datapool is empty
Console.WriteLine(await datapool.IsEmptyAsync());

// Marking the Datapool as active
await datapool.ActivateAsync();

// Marking the Datapool as inactive
await datapool.DeactivateAsync();

Item operations

With the Datapool created, you can perform operations on items. See the available actions below.

Add new items

You can add items directly through an automation or script using the SDK.

You need the following information:

  • Label: Reference of the created Datapool.
  • Item: Input values for a new item, defined with the DataPoolEntry class:
    • values: key and value structure based on the fields created for the reference Datapool.

Tip

You can create a script that inputs values into the Datapool and another one that consumes and processes that data.

Example of adding an item:

# Instantiating a new Datapool item based on the defined Schema
new_item = DataPoolEntry(
    values={
        "id": "Electronic#001"
        "name": "Smartphone",
        "price": "2000"
    }
)

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Adding a new item
datapool.create_entry(new_item)
// Instantiating a new Datapool item based on the defined Schema
var values = new Dictionary<string, object>
{
    { "id", "Electronic#001" },
    { "name", "Smartphone" },
    { "price", 2000 }
};
DatapoolEntry new_item = new DatapoolEntry(0, values);

// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Adding a new item
await datapool.CreateEntryAsync(new_item);

Cancel items

You can cancel items that are pending processing in the queue. This means the item will not be processed, but it remains in the Datapool history.

You need the following information:

  • Label: Datapool reference.
  • Item ID: Unique identifier of the item.
  • Completion message: (optional) Cancellation message.

Item state

Only items with a PENDING state can be cancelled.

Example of cancelling an item:

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Cancelling an item that is pending in the queue
datapool.cancel_entry(
    entry_id="<ENTRY_ID>",
    finish_message="Item with missing data"
)
// Not yet implemented

Delete items

You can delete items that are in the Datapool queue.

You need the following information:

  • Label: Datapool reference.
  • Item ID: Unique identifier of the item.

Item state

Items with PROCESSING or TIMEOUT state cannot be deleted.

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Removing an item from the queue
datapool.delete_entry(entry_id="<ENTRY_ID>")
// Not yet implemented

Get items

You can retrieve the reference of items for processing or to check their values.

You need the following information:

  • Label: Datapool reference.
  • Item ID: Unique identifier of the item.

Attention!

The next() method changes an item's state from PENDING to PROCESSING.

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Gets the next item from the Datapool
item_1 = datapool.next(task_id=<TASK_ID>)

# Using the get_entry() method we can also get an item by its ID
item_2 = datapool.get_entry(entry_id="<ENTRY_ID>")
// Gets the next item from the Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Getting the value of a specific field from the item
string item_data = await item.GetValueAsync("data-label");

Access item values

After obtaining the item reference, you can access its values based on the Schema that was created.

You need the following information:

  • Item: Reference of the retrieved item.
  • Label: Field identifier defined in the Schema.
# Getting the value of a specific field from the item
item_data = item["data-label"]

# Using the get_value() method will have the same effect
item_data = item.get_value("data-label")
// Gets the next item from the Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Getting the value of a specific field from the item
string item_data = await item.GetValueAsync("data-label");

Report the completion state

Reporting the completion state of an item is essential for them to be counted correctly.

Each processed item can be finalized with a state of DONE or ERROR.

You need the following information:

  • Item: Reference of the retrieved item.
  • Success: Reports the completed state.
  • Error: Reports the error state, defining the type:
    • ErrorType.SYSTEM: indicates a system error.
    • ErrorType.BUSINESS: indicates a business error.
  • Message: (optional) Custom value for completion.

Error type

By default, every error will be considered of type SYSTEM when not specified in the report.

Items reported with BUSINESS type errors will not be considered for auto-retry and abort on error scenarios. For these scenarios, only items with SYSTEM type errors will be considered.

# Gets the next available item from the Datapool
item = datapool.next(task_id=<TASK_ID>)

# Finalizing as 'COMPLETED' after processing
item.report_done(finish_message="Processed successfully!")

# Finalizing item processing indicating a system error
item.report_error(error_type=ErrorType.SYSTEM, finish_message="System unavailable.")

# Finalizing item processing indicating a business error
item.report_error(error_type=ErrorType.BUSINESS, finish_message="Invalid data.")
// Gets the next item from the Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Finalizing as 'COMPLETED' after processing
await item.ReportDoneAsync();

// Finalizing item processing as 'ERROR'
await item.ReportErrorAsync();

Suggested Datapool consumption structure

The Datapool can be consumed in various ways using the item operation methods.

See an implementation example for consuming a sequence of items in a single task:

# Consuming the next available item and reporting the completion state at the end
datapool = maestro.get_datapool(label="Items-To-Process")

while datapool.has_next():
    # Gets the next item from the Datapool
    item = datapool.next(task_id=<TASK_ID>)
    if item is None:
        # The item could be 'None' if another process consumed it first
        break

    try:
        # Processing the item...

        # Finalizing as 'COMPLETED' after processing
        item.report_done(finish_message="Processed successfully!")

    except Exception:
        # Finalizing item processing as 'ERROR'
        item.report_error(finish_message="Processing failed.")
// Consuming the next available item and reporting the completion state at the end
Datapool datapool = await maestro.GetDatapoolAsync("Items-To-Process");

while (await dataPool.HasNextAsync()) {
    // Gets the next item from the Datapool
    DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);
    if (item == null) {
        // The item could be 'null' if another process consumed it first
        break;
    }

    try {
        // Processing the item...

        // Finalizing as 'COMPLETED' after processing
        await item.ReportDoneAsync();

    } catch (Exception ex) {
        // Finalizing item processing as 'ERROR'
        await item.ReportErrorAsync();
    }
}

Warning

Always remember to include in the code the report referring to the completion state of each item that was processed.

This is extremely important so that the item states are updated within the Datapool in the BotCity Orchestrator.