Lesson 4: Persistence and Streaming -> AttributeError: '_GeneratorContextManager' object has no attribute 'get_next_version'

Hi everyone!

I have the error bellow after that I execute this code:

for event in abot.graph.stream({“messages”: messages}, thread):
for v in event.values():
print(v[‘messages’])


→ 663 self.checkpointer_get_next_version = checkpointer.get_next_version
664 self.checkpointer_put_writes = checkpointer.put_writes
665 else:

AttributeError: ‘_GeneratorContextManager’ object has no attribute ‘get_next_version’

Someone with the same problem?

Thanks!

It works if you put all code that depens on “memory” inside a with declaration:

with SqliteSaver.from_conn_string(":memory:") as memory:
  abot = Agent(model, [tool], system=prompt, checkpointer=memory)

  messages = [HumanMessage(content="What is the weather in sf?")]
  thread = {"configurable": {"thread_id": "1"}}
  
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v['messages'])

Someone with a solution without refactoring?

Thanks!

2 Likes

There is a quite forward solution. Create a .db file (e.g checkpoints.db). If you created this .db file in the same level as the script/notebook, you can go straight:

from langgraph.checkpoint.sqlite import SqliteSaver

db_path = 'checkpoints.db'
conn = sqlite3.connect(db_path, check_same_thread=False)

memory = SqliteSaver(conn)

Remark: the addition of check_same_thread=False comes from the requirement of the db connection and its writer being available in the same thread.

1 Like

This works for the synchronous example. The code can be spread across cells while still using the context.

from contextlib import ExitStack

stack = ExitStack()
memory = stack.enter_context(SqliteSaver.from_conn_string(":memory:"))

abot = Agent(model, [tool], system=prompt, checkpointer=memory)
messages = [HumanMessage(content="What is the weather in Issaquah?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

# Exit all contexts
stack.close()

Here is a similar solution for the async version:

import asyncio
from contextlib import AsyncExitStack
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

stack = AsyncExitStack()
memory = await stack.enter_async_context(AsyncSqliteSaver.from_conn_string(":memory:"))

abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

await stack.aclose()

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

fixed the issue

4 Likes

memory = MemorySaver()

It works. !!
Then What about “AsyncSqliteSaver”?

I changed one function from this:

def call_openai(self, state: AgentState):
        messages = model.invoke(state['messages'])
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

to this:

def call_openai(self, state):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        response = self.model.invoke(messages)
        return {'messages': [response]}

and changed every interaction and call to the model, that were like this:

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

to this:

with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    agent = Agent(model, [tool], checkpointer, system=prompt)

    # Mensajes de prueba
    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "1"}}

    # Streaming de eventos
    for event in agent.graph.stream({"messages": messages}, thread):
        for value in event.values():
            print(value['messages'])

And now is working well!!!

Hi,

Have you figured out how to fix it for this part:

from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
memory = AsyncSqliteSaver.from_conn_string(“:memory:”)

It is clear that
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
should be used instead but this is still not working:

memory = AsyncSqliteSaver.from_conn_string(“:memory:”)

and if I use:
memory = MemorySaver()

I don’t get the tokens :frowning:
I implemented 5 different versions with the suggestions from here and also from gpt-o1 but none returns tokens, even though tokenstreaming is set to true for the model.
According to gpt-o1 debugging: Based on your debug output, your model is sending only a single chunk with either an empty content or the full final content—i.e. no incremental tokens at all.

It would be good if this lesson is updated to reflect the current state.

Thank you, this works for me. Appreciate!

this helped me. Thank you !