There is no waiter configuration for kinesis data analytics applications. Recently I had to write the config for one myself. The below can be used in a file and imported to a script as a module.
KDA waiter
Click to expand
from botocore.waiter import WaiterModel, create_waiter_with_client
class KdaWaiter():
"""
Class that contains all KDA waiter config & functions
"""
def __init__(self,app,snapshot_name,boto_client):
self.ApplicationName = app
self.IncludeAdditionalDetails = False
self.SnapshotName = snapshot_name
self.boto_client = boto_client
def kda_running(self):
"""
Waits for KDA app to reach "RUNNING" status
The apps must be in this status to be worked with
Returns:
None
"""
waiter_name = "KdaRunning"
waiter_config = {
"version": 2,
"waiters": {
"KdaRunning": {
"operation": "describe_application",
"delay": 15, # Number of seconds to delay
"maxAttempts": 30, # Max attempts before failure
"acceptors": [
{
"matcher": "path",
"expected": "STARTING",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "retry"
},
{
"matcher": "path",
"expected": "UPDATING",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "retry"
},
{
"matcher": "path",
"expected": "RUNNING",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "success"
}
]
}
}
}
waiter_model = WaiterModel(waiter_config)
kda_app_status_waiter = create_waiter_with_client(
waiter_name, waiter_model, self.boto_client)
kda_app_status_waiter.wait(ApplicationName=self.ApplicationName,
IncludeAdditionalDetails=self.IncludeAdditionalDetails)
return kda_app_status_waiter
def kda_stopped(self):
"""
Waits for KDA app to reach "READY" status
This status is when the app is disabled/stopped. Calling it ready is a bit of a misnomer
Returns:
None
"""
waiter_name = "KdaStopped"
waiter_config = {
"version": 2,
"waiters": {
"KdaStopped": {
"operation": "describe_application",
"delay": 15, # Number of seconds to delay
"maxAttempts": 30, # Max attempts before failure
"acceptors": [
{
"matcher": "path",
"expected": "STOPPING",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "retry"
},
{
"matcher": "path",
"expected": "FORCE_STOPPING",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "retry"
},
{
"matcher": "path",
"expected": "READY",
"argument": "ApplicationDetail.ApplicationStatus",
"state": "success"
}
]
}
}
}
waiter_model = WaiterModel(waiter_config)
kda_app_status_waiter = create_waiter_with_client(
waiter_name, waiter_model, self.boto_client)
kda_app_status_waiter.wait(ApplicationName=self.ApplicationName,
IncludeAdditionalDetails=self.IncludeAdditionalDetails)
return kda_app_status_waiter
def snapshot_waiter(self):
"""
Waits for snapshot of app to reach "READY" status
This means the snapshot has been created successfully
Returns:
None
"""
waiter_name = "SnapshotReady"
waiter_config = {
"version": 2,
"waiters": {
"SnapshotReady": {
"operation": "describe_application_snapshot",
"delay": 5, # Number of seconds to delay
"maxAttempts": 30, # Max attempts before failure
"acceptors": [
{
"matcher": "path",
"expected": "FAILED",
"argument": "SnapshotDetails.SnapshotStatus",
"state": "failure"
},
{
"matcher": "path",
"expected": "CREATING",
"argument": "SnapshotDetails.SnapshotStatus",
"state": "retry"
},
{
"matcher": "path",
"expected": "READY",
"argument": "SnapshotDetails.SnapshotStatus",
"state": "success"
},
]
}
}
}
waiter_model = WaiterModel(waiter_config)
kda_snap_status_waiter = create_waiter_with_client(
waiter_name, waiter_model, self.boto_client)
kda_snap_status_waiter.wait(
ApplicationName=self.ApplicationName, SnapshotName=self.SnapshotName)
return kda_snap_status_waiter