8

我正在尝试使用 python libtorrent 每天获取大约 10k+ 种子的元数据。

这是当前的代码流

  1. 启动 libtorrent 会话。
  2. 获取我们需要在过去 1 天内上传的元数据的种子总数。
  3. 以块的形式从 DB 中获取 torrent 哈希值
  4. 使用这些哈希创建磁力链接,并通过为每个磁力 URI 创建句柄在会话中添加这些磁力 URI。
  5. 在获取元数据时休眠一秒钟,并继续检查是否找到元数据。
  6. 如果收到元数据,则将其添加到数据库中,否则检查我们是否一直在寻找元数据大约 10 分钟,如果是,则删除句柄,即现在不再查找元数据。
  7. 无限期地做上面。并保存会话状态以备将来使用。

到目前为止,我已经尝试过了。

#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri

import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity 
import os
from datetime import date, timedelta

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"

if(os.path.isfile(session_save_filename)):

    fileread = open(session_save_filename, 'rb')
    session.load_state(lt.bdecode(fileread.read()))
    fileread.close()
    print('session loaded from file')
else:
    print('new session started')

session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)

session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()

alive = True
while alive:

    db_conn = MySQLdb.connect(  host = '',  user = '',  passwd = '',    db = '',    unix_socket='/mysql/mysql.sock') # Open database connection
    #print('reconnecting')
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ;

    yesterday = date.today() - timedelta(1)
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
    #print(yesterday)

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
    #print(total_count_query)
    try:
        total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
        total_count_cursor.execute(total_count_query) # Execute the SQL command
        total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
        total_count = total_count_results[0]
        print(total_count)
    except:
            print "Error: unable to select data"

    total_pages = total_count/subset_count
    #print(total_pages)

    current_page = 1
    while(current_page <= total_pages):
        from_count = (current_page * subset_count) - subset_count

        #print(current_page)
        #print(from_count)

        hashes = []

        get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
        #print(get_mysql_data_query)
        try:
            get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
            get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
            get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
            for row in get_mysql_data_results:
                hashes.append(row[0].upper())
        except:
            print "Error: unable to select data"

        #print(hashes)

        handles = []

        for hash in hashes:
            tempdir = tempfile.mkdtemp()
            add_magnet_uri_params = {
                'save_path': tempdir,
                'duplicate_is_error': True,
                'storage_mode': lt.storage_mode_t(2),
                'paused': False,
                'auto_managed': True,
                'duplicate_is_error': True
            }
            magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
            #print(magnet_uri)
            handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
            handles.append(handle) #push handle in handles list

        #print("handles length is :")
        #print(len(handles))

        while(len(handles) != 0):
            for h in handles:
                #print("inside handles for each loop")
                if h.has_metadata():
                    torinfo = h.get_torrent_info()
                    final_info_hash = str(torinfo.info_hash())
                    final_info_hash = final_info_hash.upper()
                    torfile = lt.create_torrent(torinfo)
                    torcontent = lt.bencode(torfile.generate())
                    tfile_size = len(torcontent)
                    try:
                        insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
                        insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""",  [final_info_hash , torcontent] )
                        db_conn.commit()
                        #print "data inserted in DB"
                    except MySQLdb.Error, e:
                        try:
                            print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
                        except IndexError:
                            print "MySQL Error: %s" % str(e)    


                    shutil.rmtree(h.save_path())    #   remove temp data directory
                    session.remove_torrent(h) # remove torrnt handle from session   
                    handles.remove(h) #remove handle from list

                else:
                    if(h.status().active_time > 600):   # check if handle is more than 10 minutes old i.e. 600 seconds
                        #print('remove_torrent')
                        shutil.rmtree(h.save_path())    #   remove temp data directory
                        session.remove_torrent(h) # remove torrnt handle from session   
                        handles.remove(h) #remove handle from list
                sleep(1)        
                #print('sleep1')

        #print('sleep10')
        #sleep(10)
        current_page = current_page + 1

        #save session state
        filewrite = open(session_save_filename, "wb")
        filewrite.write(lt.bencode(session.save_state()))
        filewrite.close()


    print('sleep60')
    sleep(60)

    #save session state
    filewrite = open(session_save_filename, "wb")
    filewrite.write(lt.bencode(session.save_state()))
    filewrite.close()

我试着让上面的脚本在一夜之间运行,发现在隔夜会话中只找到了大约 1200 个 torrent 的元数据。所以我正在寻找提高脚本性能的方法。

我什至尝试解码save_state文件并注意到DHT nodes我连接了 700 多个文件。所以它不像DHT是没有运行,

我打算做的是,keep the handles active在不获取元数据的情况下无限期地在会话中。如果在 10 分钟内没有获取元数据,则不会在 10 分钟后删除句柄,就像我目前正在做的那样。

我对 lib-torrent python 绑定没有几个问题。

  1. 我可以继续运行多少个手柄?运行手柄有限制吗?
  2. 运行 10k+ 或 100k 句柄会减慢我的系统吗?还是吃掉资源?如果是,那么哪些资源?我的意思是内存,网络?
  3. 我在防火墙后面,可能是阻塞的传入端口导致元数据获取速度慢吗?
  4. 可以像 router.bittorrent.com 或任何其他禁止我的 IP 地址的 DHT 服务器发送太多请求吗?
  5. 如果其他同行发现我发出太多请求只是为了获取元数据,他们可以禁止我的 IP 地址吗?
  6. 我可以运行这个脚本的多个实例吗?或者可能是多线程?它会提供更好的性能吗?
  7. 如果使用同一脚本的多个实例,每个脚本将根据我使用的 ip 和端口获得唯一的节点 ID,这是可行的解决方案吗?

有没有更好的方法?为了实现我正在努力的目标?

4

1 回答 1

3

我无法回答特定于 libtorrent API 的问题,但您的一些问题通常适用于 bittorrent。

运行 10k+ 或 100k 句柄会减慢我的系统吗?还是吃掉资源?如果是,那么哪些资源?我的意思是内存,网络?

元数据下载不应该使用太多资源,因为它们还不是完整的种子下载,即它们不能分配实际文件或类似的东西。但是一旦他们抓住了其中的第一块,他们将需要一些内存/磁盘空间来存储元数据本身。

我在防火墙后面,可能是阻塞的传入端口导致元数据获取速度慢吗?

是的,通过减少可以建立连接的对等点的数量,在对等点数量较少的集群上获取元数据(或根本建立任何连接)变得更加困难。

NAT 可能会导致同样的问题。

可以像 router.bittorrent.com 或任何其他禁止我的 IP 地址的 DHT 服务器发送太多请求吗?

router.bittorrent.com 是一个引导节点,而不是服务器本身。查找不查询单个节点,它们查询许多不同的(数百万)。但是,是的,个别节点可以禁止,或者更有可能限制您。

这可以通过寻找随机分布的 ID 在 DHT 密钥空间中分散负载来缓解。

我可以运行这个脚本的多个实例吗?或者可能是多线程?它会提供更好的性能吗?

AIUI libtorrent 足够无阻塞或多线程,您可以一次调度多个种子。

我不知道 libtorrent 是否对传出的 DHT 请求有速率限制。

如果使用同一脚本的多个实例,每个脚本将根据我使用的 ip 和端口获得唯一的节点 ID,这是可行的解决方案吗?

如果您指的是 DHT 节点 ID,那么它们是从 IP 派生的(根据BEP 42),而不是端口。虽然包含了一些随机元素,但每个 IP 可以获得有限数量的 ID。

其中一些可能也适用于您的场景:http ://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

另一个选择是我自己的 DHT 实现,其中包括一个用于批量获取种子的 CLI。

于 2015-10-03T13:58:25.093 回答