最近在学习生产者消费者模型,假如有如下需求:从某个网址读取 log ,提取出用户 id 然后发送邮件。在不涉及数据库操作的情况下,下面的示例代码(打不开 gist 只能贴在这里了。。)可以运行。
但现在问题来了,如果需要将每一次读取的 log 内容和发送记录都存储在数据库里,保证同一个 id 只收到一次,这个数据库操作应该放在哪个线程里面呢? 用的是 kennethreitz 大神的 records + Python 自带的 Sqlite3 ,尝试了在 2 个 Thread __init__
的时候分别连接同一个数据库文件,但是提示在一个线程内创建的连接不能在另一个线程内使用,这里有些迷惑。 Or 这个需求可以用另外的原理完成?
import threading
import time
import Queue
Q = Queue.Queue()
class ProducerThread(threading.Thread):
def __init__(self, out_q):
threading.Thread.__init__(self, name='Producer')
self.q = out_q
def run(self):
while True:
# 这里读取 log
log = getlog()
if log:
for each in log:
self.q.put(each)
else:
print('NO MORE LOG')
time.sleep(60)
class ConsumerThread(threading.Thread):
def __init__(self, out_q):
threading.Thread.__init__(self, name='Consumer')
self.q = out_q
def run(self):
while True:
# consume the data.
chunk = self.q.get()
print(u'开始发送...{0}'.format(chunk))
time.sleep(2) # 模拟发送
self.q.task_done()
def main():
t = ProducerThread(Q)
t.start()
c = ConsumerThread(Q)
c.start()
Q.join()
if __name__ == "__main__":
main()
1
lonelinsky 2016-12-03 13:16:30 +08:00
数据库本身就有读写锁,直接在两个线程里面各自连接数据库不就好了嘛
|
2
dofine OP @lonelinsky 现在是有 3 个线程,如果我连接数据库写在 producer 的 `__init__` 里面,会提示这个 sqlite3 object 是在主线程里创建的呢。。我的理解是这样是在 producer 这个线程内创建的啊。。?
```python def __init__(self): self.db = sqlite3.connect('db.db') ``` |
3
lonelinsky 2016-12-03 13:34:57 +08:00 1
@dofine
t = ProducerThread(Q) 这个时候就会调用了__init__了,所以是在主线程的, t.start() 会调用 run 方法,你把连接的代码挪到 run 方法里面去应该就好了 |
4
dofine OP @lonelinsky en ,测试了一下目前是好的。。现在要处理两个线程同时写入这个数据库的问题了。。是不是又要开一个队列执行数据库的写操作呀。
|
5
lonelinsky 2016-12-03 13:53:19 +08:00
@dofine 不需要,你直接写就好,数据库层有读写锁保护的,没有问题的
|
6
lonelinsky 2016-12-03 13:55:44 +08:00
当然,如果你的写并发量大的话,用队列,然后起一个专门的数据库写线程,性能会好一点
|
7
dofine OP @lonelinsky python + sqlite3 好像不行呀,万一我那个写入 log 和写入发送记录同时写入就会报错== (其实这两个放在两个数据库文件里会不会更好?
|
8
lonelinsky 2016-12-03 14:19:04 +08:00
@dofine
看了下文档果然,不过你可以控制下超时时间 https://docs.python.org/2/library/sqlite3.html#sqlite3.connect 针对你的需求来说,反正是完全独立的表,之间不需要建立关联的直接分两个数据库确实比较好 |