Skip to content

Commit 425bc11

Browse files
author
romankoshelev
committed
feat postgresql: set application_name in pg connection
commit_hash:2f2343a69963b45718bb2a11838b32d76c0ee708
1 parent 2d9e398 commit 425bc11

6 files changed

Lines changed: 66 additions & 19 deletions

File tree

postgresql/include/userver/storages/postgres/options.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ struct ConnectionSettings {
296296
/// Helps keep track of the changes in settings
297297
SettingsVersion version{0U};
298298

299+
std::optional<std::string> application_name{};
300+
299301
bool operator==(const ConnectionSettings& rhs) const {
300302
return !RequiresConnectionReset(rhs) && recent_errors_threshold == rhs.recent_errors_threshold;
301303
}
@@ -308,7 +310,7 @@ struct ConnectionSettings {
308310
ignore_unused_query_params != rhs.ignore_unused_query_params ||
309311
max_prepared_cache_size != rhs.max_prepared_cache_size || pipeline_mode != rhs.pipeline_mode ||
310312
max_ttl != rhs.max_ttl || discard_on_connect != rhs.discard_on_connect ||
311-
omit_describe_mode != rhs.omit_describe_mode;
313+
omit_describe_mode != rhs.omit_describe_mode || application_name != rhs.application_name;
312314
}
313315
};
314316

postgresql/src/storages/postgres/component.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,6 @@ properties:
146146
- auto
147147
- manual
148148
description: how to learn the `max_pool_size`
149+
application_name:
150+
type: string
151+
description: will be displayed in postgres statistics and logs

postgresql/src/storages/postgres/detail/connection_impl.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,13 @@ void ConnectionImpl::AsyncConnect(const Dsn& dsn, engine::Deadline deadline) {
315315
// some allowance.
316316
auto timeout = std::chrono::duration_cast<std::chrono::milliseconds>(deadline.TimeLeft());
317317
deadline = testsuite_pg_ctl_.MakeExecuteDeadline(timeout);
318-
conn_wrapper_.AsyncConnect(dsn, deadline, scope);
318+
const std::string client_encoding = "UTF8";
319+
const PGConnectionWrapper::ConnectParams start_params{
320+
/* .dsn = */ dsn,
321+
/* .application_name = */ settings_.application_name,
322+
/* .client_encoding = */ client_encoding,
323+
};
324+
conn_wrapper_.AsyncConnect(start_params, deadline, scope);
319325
conn_wrapper_.FillSpanTags(span, {timeout, GetStatementTimeout()});
320326

321327
const auto session_id = ExecuteCommandNoPrepare(kGetSessionIdQuery, deadline).AsSingleRow<std::string>();
@@ -325,8 +331,16 @@ void ConnectionImpl::AsyncConnect(const Dsn& dsn, engine::Deadline deadline) {
325331
// We cannot handle exceptions here, so we let them got to the caller
326332
if (settings_.discard_on_connect == ConnectionSettings::kDiscardAll) {
327333
ExecuteCommandNoPrepare("DISCARD ALL", deadline);
334+
SetParameter("client_encoding", client_encoding, Connection::ParameterScope::kSession, deadline);
335+
if (start_params.application_name) {
336+
SetParameter(
337+
"application_name",
338+
*start_params.application_name,
339+
Connection::ParameterScope::kSession,
340+
deadline
341+
);
342+
}
328343
}
329-
SetParameter("client_encoding", "UTF8", Connection::ParameterScope::kSession, deadline);
330344
RefreshReplicaState(deadline);
331345
SetConnectionStatementTimeout(GetDefaultCommandControl().statement_timeout_ms, deadline);
332346
if (settings_.user_types != ConnectionSettings::kPredefinedTypesOnly) {

postgresql/src/storages/postgres/detail/pg_connection_wrapper.cpp

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <storages/postgres/detail/pg_connection_wrapper.hpp>
22

3+
#include <boost/container/small_vector.hpp>
4+
35
#include <pg_config.h>
46

57
#ifndef USERVER_NO_LIBPQ_PATCHES
@@ -297,49 +299,65 @@ engine::Task PGConnectionWrapper::Cancel() {
297299
});
298300
}
299301

300-
void PGConnectionWrapper::AsyncConnect(const Dsn& dsn, Deadline deadline, tracing::ScopeTime& scope) {
301-
PGCW_LOG_DEBUG() << "Connecting to " << DsnCutPassword(dsn);
302+
void PGConnectionWrapper::AsyncConnect(const ConnectParams& params, Deadline deadline, tracing::ScopeTime& scope) {
303+
PGCW_LOG_DEBUG() << "Connecting to " << DsnCutPassword(params.dsn);
302304

303-
auto options = OptionsFromDsn(dsn);
305+
auto options = OptionsFromDsn(params.dsn);
304306
log_extra_.Extend(tracing::kDatabaseInstance, std::move(options.dbname));
305307
log_extra_.Extend(tracing::kPeerAddress, std::move(options.host) + ':' + options.port);
306308

307309
scope.Reset(scopes::kLibpqConnect);
308-
StartAsyncConnect(dsn);
310+
StartAsyncConnect(params);
309311
scope.Reset(scopes::kLibpqWaitConnectFinish);
310-
WaitConnectionFinish(deadline, dsn);
311-
PGCW_LOG_DEBUG() << "Connected to " << DsnCutPassword(dsn);
312+
WaitConnectionFinish(deadline, params.dsn);
313+
PGCW_LOG_DEBUG() << "Connected to " << DsnCutPassword(params.dsn);
312314
}
313315

314-
void PGConnectionWrapper::StartAsyncConnect(const Dsn& dsn) {
316+
void PGConnectionWrapper::StartAsyncConnect(const ConnectParams& params) {
315317
if (conn_) {
316318
PGCW_LOG_LIMITED_ERROR()
317319
<< "Attempt to connect a connection that is already connected" << logging::LogExtra::Stacktrace();
318-
throw ConnectionFailed{dsn, "Already connected"};
320+
throw ConnectionFailed{params.dsn, "Already connected"};
319321
}
320322

321323
// PQconnectStart() may access /etc/hosts, ~/.pgpass, /etc/passwd, etc.
322-
engine::CriticalAsyncNoSpan(bg_task_processor_, [&dsn, this] {
323-
conn_ = PQconnectStart(dsn.GetUnderlying().c_str());
324+
engine::CriticalAsyncNoSpan(bg_task_processor_, [&params, this] {
325+
boost::container::small_vector<const char*, 8> keywords{"dbname"};
326+
boost::container::small_vector<const char*, 8> values{params.dsn.GetUnderlying().c_str()};
327+
328+
if (params.application_name) {
329+
keywords.push_back("application_name");
330+
values.push_back(params.application_name->c_str());
331+
}
332+
333+
if (params.client_encoding) {
334+
keywords.push_back("client_encoding");
335+
values.push_back(params.client_encoding->c_str());
336+
}
337+
338+
keywords.push_back(nullptr);
339+
values.push_back(nullptr);
340+
341+
conn_ = PQconnectStartParams(keywords.data(), values.data(), true);
324342
}).Get();
325343

326344
if (!conn_) {
327345
// The only reason the pointer cannot be null is that libpq failed
328346
// to allocate memory for the structure
329347
PGCW_LOG_LIMITED_ERROR() << "libpq failed to allocate a PGconn structure" << logging::LogExtra::Stacktrace();
330-
throw ConnectionFailed{dsn, "Failed to allocate PGconn structure"};
348+
throw ConnectionFailed{params.dsn, "Failed to allocate PGconn structure"};
331349
}
332350

333351
const auto status = PQstatus(conn_);
334352
if (CONNECTION_BAD == status) {
335-
const std::string msg = MsgForStatus(status);
353+
const std::string msg = fmt::format("{}: {}", MsgForStatus(status), PQerrorMessage(conn_));
336354
PGCW_LOG_WARNING() << msg;
337-
CloseWithError(ConnectionFailed{dsn, msg});
355+
CloseWithError(ConnectionFailed{params.dsn, msg});
338356
} else {
339357
PGCW_LOG_TRACE() << MsgForStatus(status);
340358
}
341359

342-
RefreshSocket(dsn);
360+
RefreshSocket(params.dsn);
343361

344362
// set this as early as possible to avoid dumping notices to stderr
345363
PQsetNoticeReceiver(conn_, &NoticeReceiver, this);

postgresql/src/storages/postgres/detail/pg_connection_wrapper.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ class PGConnectionWrapper {
3030
using Duration = Deadline::TimePoint::clock::duration;
3131
using ResultHandle = detail::ResultWrapper::ResultHandle;
3232

33+
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
34+
struct ConnectParams final {
35+
Dsn dsn;
36+
std::optional<std::string> application_name;
37+
std::optional<std::string> client_encoding;
38+
};
39+
3340
PGConnectionWrapper(
3441
engine::TaskProcessor& tp,
3542
concurrent::BackgroundTaskStorageCore& bts,
@@ -53,7 +60,7 @@ class PGConnectionWrapper {
5360
///
5461
/// Start asynchronous connection and wait for it's completion (suspending
5562
/// current coroutine)
56-
void AsyncConnect(const Dsn& dsn, Deadline deadline, tracing::ScopeTime&);
63+
void AsyncConnect(const ConnectParams& params, Deadline deadline, tracing::ScopeTime&);
5764

5865
/// @brief Causes a connection to enter pipeline mode.
5966
///
@@ -154,7 +161,7 @@ class PGConnectionWrapper {
154161
private:
155162
PGTransactionStatusType GetTransactionStatus() const;
156163

157-
void StartAsyncConnect(const Dsn& dsn);
164+
void StartAsyncConnect(const ConnectParams& params);
158165

159166
/// @throws ConnectionTimeoutError if was awakened by the deadline
160167
void WaitConnectionFinish(Deadline deadline, const Dsn& dsn);

postgresql/src/storages/postgres/postgres_config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <storages/postgres/experiments.hpp>
88
#include <userver/storages/postgres/component.hpp>
99
#include <userver/storages/postgres/exceptions.hpp>
10+
#include <userver/utils/userver_info.hpp>
1011

1112
#include <userver/formats/common/items.hpp>
1213

@@ -97,6 +98,8 @@ ConnectionSettings ParseConnectionSettings(const ConfigType& config) {
9798
? ConnectionSettings::kDiscardAll
9899
: ConnectionSettings::kDiscardNone;
99100
settings.deadline_propagation_enabled = config["deadline-propagation-enabled"].template As<bool>(true);
101+
settings.application_name =
102+
config["application_name"].template As<std::string>(USERVER_NAMESPACE::utils::GetUserverIdentifier());
100103

101104
return settings;
102105
}

0 commit comments

Comments
 (0)