-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdata-firehose-helpers.ts
More file actions
116 lines (100 loc) · 3.42 KB
/
data-firehose-helpers.ts
File metadata and controls
116 lines (100 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import {
DescribeDeliveryStreamCommand,
DescribeDeliveryStreamCommandOutput,
ExtendedS3DestinationUpdate,
FirehoseClient,
UpdateDestinationCommand,
} from '@aws-sdk/client-firehose';
import { FIREHOSE_STREAM_NAME, REGION } from 'constants/backend-constants';
export async function alterFirehoseBufferIntervals(bufferIntervalConfig: {
expected: {
destination: number;
processor: number;
};
update: {
destination: number;
processor: number;
};
}) {
const client = new FirehoseClient({ region: REGION });
const deliveryStreamDetails: DescribeDeliveryStreamCommandOutput =
await client.send(
new DescribeDeliveryStreamCommand({
DeliveryStreamName: FIREHOSE_STREAM_NAME,
}),
);
const destinations =
deliveryStreamDetails.DeliveryStreamDescription?.Destinations ?? [];
if (destinations.length !== 1) {
throw new Error('expected a single delivery destination');
}
const destination = destinations[0];
const currentDestinationBufferInterval =
destination.ExtendedS3DestinationDescription?.BufferingHints
?.IntervalInSeconds;
if (
currentDestinationBufferInterval !==
bufferIntervalConfig.expected.destination
) {
throw new Error(
`Expected destination buffer size to be ${bufferIntervalConfig.expected.destination} - got ${currentDestinationBufferInterval} - cannot safely alter, has the default value changed in code or manually?`,
);
}
const processors =
destination.ExtendedS3DestinationDescription?.ProcessingConfiguration
?.Processors;
if (processors?.length !== 1) {
throw new Error('Expected one processor to be configured');
}
const processor = processors[0];
const currentProcessorBufferInterval = processor.Parameters?.find(
(p) => p.ParameterName === 'BufferIntervalInSeconds',
)?.ParameterValue;
const otherParams =
processor.Parameters?.filter(
(p) => p.ParameterName !== 'BufferIntervalInSeconds',
) ?? [];
if (
currentProcessorBufferInterval !==
bufferIntervalConfig.expected.processor.toString()
) {
throw new Error(
`Expected processor buffer size to be ${bufferIntervalConfig.expected.processor} - got ${currentProcessorBufferInterval} - cannot safely alter, has the default value changed in code or manually?`,
);
}
const destinationId = destination.DestinationId;
if (!destinationId) {
throw new Error('Destination ID not found');
}
const updatedDestinationConfig: ExtendedS3DestinationUpdate = {
...destination.ExtendedS3DestinationDescription,
BufferingHints: {
...destination.ExtendedS3DestinationDescription?.BufferingHints,
IntervalInSeconds: bufferIntervalConfig.update.destination,
},
ProcessingConfiguration: {
...destination.ExtendedS3DestinationDescription?.ProcessingConfiguration,
Processors: [
{
...processor,
Parameters: [
...otherParams,
{
ParameterName: 'BufferIntervalInSeconds',
ParameterValue: bufferIntervalConfig.update.processor.toString(),
},
],
},
],
},
};
await client.send(
new UpdateDestinationCommand({
DeliveryStreamName: FIREHOSE_STREAM_NAME,
DestinationId: destinationId,
CurrentDeliveryStreamVersionId:
deliveryStreamDetails.DeliveryStreamDescription?.VersionId,
ExtendedS3DestinationUpdate: updatedDestinationConfig,
}),
);
}