Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSSpecStore: unclosed aiohttp resources when using gcsfs #2674

Closed
d-v-b opened this issue Jan 8, 2025 · 14 comments
Closed

FSSpecStore: unclosed aiohttp resources when using gcsfs #2674

d-v-b opened this issue Jan 8, 2025 · 14 comments

Comments

@d-v-b
Copy link
Contributor

d-v-b commented Jan 8, 2025

invoking this script:

# /// script
# dependencies = [
#   "fsspec[gcs]",
#   "zarr==3.0.0-rc2",
# ]
# ///

from zarr import open_group
from time import time
from zarr import config as zarr_config
def test_list_members() -> None:
    z = open_group(
        'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3', 
        mode='r',
        storage_options = {'token': 'anon'}, 
        use_consolidated=False)
    t = time()
    with zarr_config.set({'async.concurrency': 16}):
        members = z.members()
    elapsed = time() - t
    print(f'Discovered {len(members)} members in {elapsed:0.2f}s')

if __name__ == '__main__':
    test_list_members()

produces the following output. I truncated the list of unclosed connections, because it's rather long:

Discovered 277 members in 11.04s
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x100f17cb0>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x103f5bcb0>, 1508001.482631791), (<aiohttp.client_proto.ResponseHandler object at 0x104298b90>, 1508001.483053583), (<aiohttp.client_proto.ResponseHandler object at 0x103f5bad0>, 1508001.486612916), (<aiohttp.client_proto.ResponseHandler object at 0x104298470>, 1508001.491534833), (<aiohttp.client_proto.ResponseHandler object at 0x103f5b350>, 1508001.506562416), (<aiohttp.client_proto.ResponseHandler object at 0x103f5ac90>, 1508001.50906125), (<aiohttp.client_proto.ResponseHandler object at 0x104298d70>, 1508001.509699125), (<aiohttp.client_proto.ResponseHandler object at 0x104299c70>, 1508001.509885083), (<aiohttp.client_proto.ResponseHandler object at 0x104299310>, 1508001.511447541), (<aiohttp.client_proto.ResponseHandler object at 0x103f5ae70>, 1508001.523475833), (<aiohttp.client_proto.ResponseHandler object at 0x10429a990>, 1508001.524044291), (<aiohttp.client_proto.ResponseHandler object at 0x10429b6b0>, 1508001.543523625), (<aiohttp.client_proto.ResponseHandler object at 0x103ef70b0>, 1508001.56152575), (<aiohttp.client_proto.ResponseHandler object at 0x10429ab70>, 1508001.562331375), (<aiohttp.client_proto.ResponseHandler object at 0x104298650>, 1508001.586078666)
...

Since zarr-python doesn't import anything from aiohttp, I'm pretty sure this is down to gcsfs. The GCSFileSystem.close_session method caught my eye, but invoking it e.g. via store.fs.close_session(None, None) didn't resolve the unclosed connections warnings. @martindurant any recommendations for making these warnings go away (and ideally also closing the connections)?

@martindurant
Copy link
Member

It may be useful to turn on the "gcsfs" logger, to see if multiple filesystem instances are being made or similar, and what exact calls are happening.

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

how would I do this in that code example?

@martindurant
Copy link
Member

I usually do fsspec.utils.setup_logging(logger_name="gcsfs")

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

here's a gist with the logging output, mostly i'm seeing a huge pile of GET requests (as expected). I didn't see anything indicating multiple filesystem instances, but I could have missed it.

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

these apparently identical requests at the top are bit odd, not sure if that's a clue.

2025-01-08 15:34:59,751 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None
2025-01-08 15:35:00,364 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None
2025-01-08 15:35:00,964 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None
2025-01-08 15:35:01,746 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None
2025-01-08 15:35:02,683 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None
2025-01-08 15:35:03,377 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('gcp-public-data-arco-era5',), None

@martindurant
Copy link
Member

apparently identical requests at the top are bit odd

Probably these are file listings; they should be cached unless they have a limit on the number of items to return. Perhaps they are launched from multiple threads at the same time and so defeating caching. So these are, I suppose checking "directory"-ness, which for a bucket it always true. That's a bug causing redundant work, but not the unclosed connections.

So, some thoughts:

  • gcsfs has a weakref destructor that should close the session, which would close the connection and lingering responses ( https://github.com/fsspec/gcsfs/blob/main/gcsfs/core.py#L354 ). Note that this tries to guess whether the formal async closer can still be called or not, so there can be a race condition depending when cleanup happens versus when the loop dies. More logging in that method may clarify what actually happens when the loop is provided by zarr rather than fsspec.
  • all requests go via here; using with is supposed to ensure that the request is cleaned up whatever else may happen. Of course, a connection pool doesn't have to evict objects until a slot is needed to something new.
  • the method fsspec.asyn._dump_running_tasks can be used to get the state of the async coroutine stack. This might result in a lot of output.

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

I dropped a breakpoint in aiohttp.client.py::ClientSession.close and it seems like that method is never getting called. So it's possible that something is going sideways in GCSFileSystem.close_session that should otherwise trigger closure of the client session.

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

digging a bit more into it, I think the logic of close_session needs to be fixed.

In my example, loop is None when close_session is invoked, when means the entire function is a no-op because of an unhandled conditional branch.

def close_session(loop, session):
    if loop is not None and session is not None: # <---- If this conjunction is not true, then this function does nothing
        if loop.is_running():
            try:
                current_loop = asyncio.get_running_loop()
                current_loop.create_task(session.close())
                return
            except RuntimeError:
                pass


            try:
                asyn.sync(loop, session.close, timeout=0.1)
            except fsspec.FSTimeoutError:
                pass
        else:
            pass

@martindurant
Copy link
Member

Perhaps can you try with fsspec/gcsfs#657 ?

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

this works like a charm, thanks @martindurant

# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "fsspec",
#     "gcsfs",
# ]
#
# [tool.uv.sources]
# fsspec = { git = "https://github.com/martindurant/gcsfs.git@better_shutdown" }
# ///

from zarr import open_group
from time import time
from zarr import config as zarr_config

# fsspec.utils.setup_logging(logger_name="gcsfs")

def test_list_members() -> None:
    z = open_group(
        'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3', 
        mode='r',
        storage_options = {'token': 'anon'}, 
        use_consolidated=False)
    t = time()
    with zarr_config.set({'async.concurrency': 16}):
        members = z.members()
    elapsed = time() - t
    print(f'Discovered {len(members)} members in {elapsed:0.2f}s')

if __name__ == '__main__':
    test_list_members()

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 8, 2025

this should be fixed by fsspec/gcsfs#657, which will be part of a gcsfs release later this month. closing.

@d-v-b d-v-b closed this as completed Jan 8, 2025
@tlambert03
Copy link

Having a similar issue, but not sure if it's related (since this is using HTTPFileSystem rather than GCSFileSystem):

running this script:

import zarr
URL = "https://s3.embl.de/i2k-2020/ngff-example-data/v0.4/tczyx.ome.zarr"
zarr_arr = zarr.open(URL, mode="r")

gives this on exit:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x101948200>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x1072e3d10>, 1066999.872162625), (<aiohttp.client_proto.ResponseHandler object at 0x107554050>, 1066999.872625166), (<aiohttp.client_proto.ResponseHandler object at 0x1072e3ef0>, 1066999.87292075), (<aiohttp.client_proto.ResponseHandler object at 0x1075547d0>, 1067000.058575958)])']
connector: <aiohttp.connector.TCPConnector object at 0x1072d8050>

(which in pytest, with warnings as errors raises a huge long list of ResourceWarning)

Exception ignored in: <function ClientSession.__del__ at 0x13d4a9f80>
Traceback (most recent call last):
  File "/Users/talley/dev/self/ndv/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 437, in __del__
ResourceWarning: Unclosed client session <aiohttp.client.ClientSession object at 0x13b39f260>
Exception ignored in: <function BaseConnector.__del__ at 0x13d4600e0>
Traceback (most recent call last):
  File "/Users/talley/dev/self/ndv/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 325, in __del__
ResourceWarning: Unclosed connector <aiohttp.connector.TCPConnector object at 0x13d4a7e90>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x101c265c0>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/sslproto.py", line 123, in __del__
ResourceWarning: unclosed transport <asyncio._SSLProtocolTransport object>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=20>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=22>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=24>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=21>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=19>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=10>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=12>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=23>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=14>
Exception ignored in: <function _SelectorTransport.__del__ at 0x101ccc680>
Traceback (most recent call last):
  File "/Users/talley/.local/share/uv/python/cpython-3.12.6-macos-aarch64-none/lib/python3.12/asyncio/selector_events.py", line 879, in __del__
ResourceWarning: unclosed transport <_SelectorSocketTransport fd=13>
Exception ignored in: <socket.socket fd=20, family=2, type=1, proto=6, laddr=('10.119.56.87', 57999), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=20, family=2, type=1, proto=6, laddr=('10.119.56.87', 57999), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=22, family=2, type=1, proto=6, laddr=('10.119.56.87', 58001), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=22, family=2, type=1, proto=6, laddr=('10.119.56.87', 58001), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=24, family=2, type=1, proto=6, laddr=('10.119.56.87', 58003), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=24, family=2, type=1, proto=6, laddr=('10.119.56.87', 58003), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=21, family=2, type=1, proto=6, laddr=('10.119.56.87', 58000), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=21, family=2, type=1, proto=6, laddr=('10.119.56.87', 58000), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=19, family=2, type=1, proto=6, laddr=('10.119.56.87', 57998), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=19, family=2, type=1, proto=6, laddr=('10.119.56.87', 57998), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=10, family=2, type=1, proto=6, laddr=('10.119.56.87', 57992), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=10, family=2, type=1, proto=6, laddr=('10.119.56.87', 57992), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=12, family=2, type=1, proto=6, laddr=('10.119.56.87', 57993), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=12, family=2, type=1, proto=6, laddr=('10.119.56.87', 57993), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=23, family=2, type=1, proto=6, laddr=('10.119.56.87', 58002), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=23, family=2, type=1, proto=6, laddr=('10.119.56.87', 58002), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=14, family=2, type=1, proto=6, laddr=('10.119.56.87', 57995), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=14, family=2, type=1, proto=6, laddr=('10.119.56.87', 57995), raddr=('194.94.45.80', 443)>
Exception ignored in: <socket.socket fd=13, family=2, type=1, proto=6, laddr=('10.119.56.87', 57994), raddr=('194.94.45.80', 443)>
ResourceWarning: unclosed <socket.socket fd=13, family=2, type=1, proto=6, laddr=('10.119.56.87', 57994), raddr=('194.94.45.80', 443)>

should I open an independent issue? Or is this a usage error now (I didn't see any mention of a context manager I should use in the migration guide)

@d-v-b
Copy link
Contributor Author

d-v-b commented Jan 9, 2025

I wonder if the fix @martindurant put together in fsspec/gcsfs#657 also needs to be applied to HTTPFilesystem?

@martindurant
Copy link
Member

Yes, I would say the cleanup behaviour for HTTP should be the same as for gcsfs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants