Run Class methods in threads (python)

PythonMultithreadingClassMethods

Python Problem Overview


I'm currently learning Python and Classes and I have a basic question, but I didn't find any answer to it. Let's say I have this dummy class

class DomainOperations:
	def __init__(self, domain):
		self.domain = domain
		self.domain_ip = ''
		self.website_thumbnail = ''
	
	def resolve_domain(self):
		#resolve domain to ipv4 and save to self.domain_ip
		
	def generate_website_thumbnail(self):
		#generate website thumbnail and save the url to self.website_thumbnail

I want to run simultaneously resolve_domain and generate_website_thumbnail and when the threads are finished I want to print the IP and the thumbnail.

EDIT: I know I should use threads, maybe something like this

r = DomainOperations('google.com')

t1 = threading.Thread(target=r.resolve_domain)
t1.start()

t2 = threading.Thread(target=r.generate_website_thumbnail)
t2.start()

But should I use them outside the Class? Should I write another Class to handle Threads?

What is the right way to do that?

Python Solutions


Solution 1 - Python

If you call them from the class, it is as simple as:

import threading

class DomainOperations:

    def __init__(self):
        self.domain_ip = ''
        self.website_thumbnail = ''

    def resolve_domain(self):
        self.domain_ip = 'foo'

    def generate_website_thumbnail(self):
        self.website_thumbnail= 'bar'

    def run(self):
        t1 = threading.Thread(target=self.resolve_domain)
        t2 = threading.Thread(target=self.generate_website_thumbnail)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(self.domain_ip, self.website_thumbnail)

if __name__ == '__main__':
    d = DomainOperations()
    d.run()

Solution 2 - Python

You can inherit Thread class in DomainOperation, in this way code would be more clean and easily understandable. you have to override a run() method.

from threading import Thread

class DomainOperations(Thread):
    def __init__(self, *args, **kwargs):
       super().__init__(*args, **kwargs)
       self.domain_ip = ''
       self.website_thumbnail = ''

   def resolve_domain(self):
       self.domain_ip = 'foo'

   def generate_website_thumbnail(self):
       self.website_thumbnail= 'bar'

   def run(self):
       #domain will be resolved on first thread
       self.resolve_domain()
       #thumbnail will be resolved on second OR newly created below thread
       thread2 = Thread(target=self.generate_website_thumbnail)
       thread.start()
       # thread1 will wait for thread2
       self.join()
       # thread2 will wait for thread1, if it's late.
       thread2.join()
       # here it will print ip and thumbnail before exiting first thread
       print(self.domain_ip, self.website_thumbnail)

And you will start your threads in this way.

if __name__ == '__main__':
   thread1 = DomainOperations()
   thread1.start()

Solution 3 - Python

def post_test(tbid, line_num, response_time):
    """
    :param tbid: 参数id
    :return:
    """

    # 请求参数
    data = {'tbId': tbid, 'conditions': [{"key": "", "type": 1}], 'pageNum': 1, 'pageSize': 12}
    # 请求启动时间

    start = time.time()
    # post请求
    r = requests.post(url=url, data=json.dumps(data), headers=headers)
    # 请求结束时间
    end = time.time()
    # 保留两位小数
    finall_time = float('%.2f' % float(end - start))
    text = json.loads(r.text)
    # IO写入 只写入200的
    with open('text6.csv', 'a', newline='') as csvfile:
       if text['statusCode'] == '200':
        throughput = line_num * response_time / finall_time
        throughput = float('%.2f' % float(throughput))
        print('the perf_counter time of %s is %s and the content is %s ,throughput is %s' % (
            tbid, finall_time, json.loads(r.text), throughput))
        spamwriter = csv.writer(csvfile, dialect='excel')
        spamwriter.writerow([tbid] + [finall_time] + [throughput])
def start_thread(csv_name):
  tbid, response_time_sort, throughput_sort = read_csv(csv_name)
  print(tbid)
  line_num = len(tbid)
  response_times = 5

  for j in range(response_times):
    for i in tbid:
        t = threading.Thread(target=post_test, args=(i, line_num, response_times))
        t.start()
        t.join()

I don't know how to call a method in a class, especially if it has initialization parameters, but you can try this method。 I'm trying to use multiple processes to solve this problem, right。

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionnacholibreView Question on Stackoverflow
Solution 1 - PythonA. RodasView Answer on Stackoverflow
Solution 2 - PythonHafiz HashimView Answer on Stackoverflow
Solution 3 - Pythonuser10845203View Answer on Stackoverflow