Skip to content

Long running queries / connection leak #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jimmyff opened this issue Oct 2, 2019 · 14 comments
Open

Long running queries / connection leak #104

jimmyff opened this issue Oct 2, 2019 · 14 comments

Comments

@jimmyff
Copy link

jimmyff commented Oct 2, 2019

I'm running in to a strange issue using this package with GCP Cloud SQL.

If I generate a lot of traffic I start to see timeout errors in places I really wouldn't expect to get timeout errors (eg: a simple insert statement). After running under high load for a while I then start getting this error:
Caught connect error, PostgreSQLSeverity.fatal 53300: remaining connection slots are reserved for non-replication superuser connections , #0 PostgreSQLConnection.open (package:postgres/src/connection.dart:133:7)

When I look at the pg_stat_activity I see many queries doing the insert with wait_event = "ClientRead" and state = "idle in transaction". Not sure how I could fail to close a transaction?

I added a timeoutInSeconds to both the queries in my file however now the connection pool is getting filled up with the delete queries, they show wait_event = "tuple" and state = "active".

I think it sounds like I have a connection leak but I'm not sure how. I've included my code below.

/// Request should provide the AccountReplicateMessage object as it's body
Future<shelf.Response> accountReplicateService(shelf.Request request) async {
  final requestString = await request.readAsString();
  if (requestString.isEmpty) {
    return shelf.Response(400,
        body: '400: Bad Request', headers: {'Cache-Control': 'no-store'});
  }
  print('accountReplicateService request: $requestString');
  final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64
      .decode(json.decode(requestString)['message']['data'].toString()))));

  final db = GetIt.instance<DatabaseService>().db;
  try {
    await db.open();
  } catch (e, s) {
    print(['Caught connect error', e, s]);
    return shelf.Response.internalServerError(
        body: 'Failed to connect to Database.',
        headers: {'Cache-Control': 'no-store'});
  }

  try {
    final account = message.account;
    var result = await db.transaction((PostgreSQLExecutionContext ctx) async {

      await ctx.query("DELETE FROM public.profile where id = @idParam",
          substitutionValues: {"idParam": account.uid}, timeoutInSeconds: 10);

      await ctx.query("""
    INSERT INTO public.profile (
	    id, dob, visible
    ) VALUES (
      @idParam, @dobParam, @visibleParam, 
    )
  """, substitutionValues: {
        "idParam": account.uid,
        "dobParam": account.dob,
        "visibleParam": account.profile.visible,
      }, timeoutInSeconds: 30);

      // Calculate the bloom filter bits
      final List<int> bitIndexes = BloomFilter.hashIndexesWithSize<String>(
          ServiceConfig.bloomFilterBitCount,
          ServiceConfig.bloomFilterExpectedItems,
          account.uid);

      final List<String> profileIds = await executeSearch(
          ctx, bitIndexes.first, account.profile, account.uid);

      final replicatedMessage = AccountReplicatedMessage((b) => b
        ..uid = account.uid
        ..bloomFilterBits = BuiltList<int>.from(bitIndexes).toBuilder()
        ..bloomFilterBitLastSearched = bitIndexes.first
        ..profiles = BuiltList<String>.from(profileIds).toBuilder());

      // publish account_replicated message
      final gapi = await GetIt.instance<GapiService>().gapiClient;

      final PubsubApi pubsubApi =
          GetIt.instance<PubSubService>().pubsubApi(gapi);
      final response = await pubsubApi.projects.topics.publish(
          PublishRequest.fromJson({
            "messages": [
              {
                "attributes": <String, String>{
                  "uid": account.uid,
                },
                "data": base64Encode(
                    utf8.encode(json.encode(replicatedMessage.toJsonMap()))),
                "publishTime": DateTime.now().toUtc().toIso8601String()
              }
            ]
          }),
          'projects/onesceneapp/topics/account_replicated');

      print('Inserted: ${account.uid}.  x${profileIds.length} search results.');

    });
    await db.close();

    return shelf.Response.ok('', headers: {'Cache-Control': 'no-store'});
  } catch (e, s) {
    print(e);
    print(s);
    await db.close();
    return shelf.Response.internalServerError(
        headers: {'Cache-Control': 'no-store'});
  }
}


@itsjoeconway
Copy link
Contributor

I'm not 100% sure of what the exact cause of your issue is, but you need a query queue if you are running into the connection limit of your database server - regardless of whether or not you are leaking connections. You can accomplish this easily by just reusing the database connection - there is a queue built into it. Creating a new database connection and closing it for every HTTP request is more expensive than it needs to be.

You can see how Aqueduct does this here, or use some other pooling package available on pub. FWIW, you could also use Aqueduct, cut all of this code in half, push the test surface to the framework instead of your code, and avoid difficult to debug statements like this one: final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64.decode(json.decode(requestString)['message']['data'].toString()))));

@jimmyff
Copy link
Author

jimmyff commented Oct 3, 2019

Okay, I'll have a go at switching over to Aqueduct, I initially thought it was maybe a little overkill for what I needed but as I'm already running in to issues using shelf maybe I should just make the switch. Is it relatively straight forward to switch from shelf to Aqueduct?

How would it avoid those difficult to debug statements?

Thanks

@itsjoeconway
Copy link
Contributor

Not too sure on what it takes to switch - it can't be too bad: the method you've shown here would be pretty close to valid just by switching shelf. with aqueduct. The routing and application structure are different, but that's the easy part anyway.

  • See this section on body decoding to clean up that statement I pulled out.
  • See this section on exception handling behavior to reduce your need to try-catch and send responses.

@bubnenkoff
Copy link

It seems that I faced with very similar issue!

At some point requests start to be processed extremely slowly and until I kill the connection the speed won't increase

https://stackoverflow.com/questions/68375709/why-can-a-query-periodically-take-10-times-longer-to-execute

@isoos
Copy link
Collaborator

isoos commented Jul 15, 2021

@bubnenkoff: I've created package:postgres_pool for this exact reason: it allows you to have periodically reconnected connections based on configured settings like query counts, connection age, total session duration...

@bubnenkoff
Copy link

@isoos oh cool! Thanks for fast answer. I just to thought how to contact with you. Even wrote to Telegram (do not know is it you or not). Am I right understand that postgres_pool is just like hack to prevent such leek?

@isoos
Copy link
Collaborator

isoos commented Jul 15, 2021

I have no Telegram, so that's not me :)

Am I right understand that postgres_pool is just like hack to prevent such leek?

It also has retry logic for transactions, and concurrency control.

@bubnenkoff
Copy link

@isoos but is there any chance that driver will get fixed in nearest time? I can try postgres_pool, but I also can wait for a fixing driver.

@isoos
Copy link
Collaborator

isoos commented Jul 15, 2021

@bubnenkoff stablekernel's repo is no longer active, I maintain a fork and I have uploader rights to the package. If somebody wants to work on this, I'll be happy to accept PRs, but for me reconnecting periodically solves the pain, so I won't put much effort into it myself... I'm not even sure if it is something the Dart package should fix, or just some kind of resource aggregation on the connection that would happen anyway...

@bubnenkoff
Copy link

@isoos thanks! Is it possible to do something like:

main() {
  // ...
  Timer.periodic(Duration(hours: 1), (timer) {
   connection.close();
   connection.open();
  });

I do not want to rewrite code for now. And need any simple hack. Is it good idea to do so?

@isoos
Copy link
Collaborator

isoos commented Jul 15, 2021

I think postgres_pool will serve you well:

@bubnenkoff
Copy link

@isoos Could you help me, I am writing on dart not too often. How to pass this connections settings?

PgPoolSettings? settings;

void main() async {

pg = PgPool(
    PgEndpoint(
        host: 'localhost',
        port: 5432,
        database: 'mydb',
        username: 'postgres',
        password: '123'),
        settings: settings // what should be here?
  ) ;

@isoos
Copy link
Collaborator

isoos commented Jul 23, 2021

@bubnenkoff: I've updated the example here:
https://github.com/agilord/postgres_pool/blob/master/example/example.dart#L4-L15

@bubnenkoff
Copy link

bubnenkoff commented Aug 7, 2021 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants