Python 进阶理解 [draft]
Xi’an, China: ☀️ 🌡️-4°C 🌬️↙4km/h
为什么判断 None 要用 is?
# 常见三种判断
a = None
if a:
print('None')
if a == None:
print('None')
if a is None:
print('None')
- 第一种情况,空的集合或字典等,判断的时候也会被视为 False 对待,但有时侯我们需要对输入进行细分,同时通过改变数据内置函数
__bool__
,可以改变布尔值;
class MyStructure:
def __bool__(self):
return True
d = MyStructure()
if not d:
print("It's None")
- 第二种情况,使用
==
判断,依然可以被重载,可使用__eq__
函数进行修改,同时,使用rich compare
进行比较,速度会比较慢 - 第三种情况,使用
is
进行判断,直接对比两个 object 的指针,速度更快
装饰器详解
所有的函数都是对象
- 函数作为变量传入其他函数
# 函数作为变量传入其他函数
def double(x):
return 2 * x
def triple(x):
return 3 * x
def calc_number(func,x):
print(func(x))
calc_number(double, 2)
calc_number(triple, 2)
- 函数本身也可作为返回值
def get_mul_func(n):
def multipl(x):
return n*x
return multipl
double = get_mul_func(2)
triple = get_mul_func(3)
print(double(3))
print(triple(3))
- dec 一般认为参数参数和返回值都是函数的函数
import time
def timeit(f):
def wrapper(*args, **kwargs):
start = time.time()
ret = f(*args, **kwargs)
print(time.time()-start)
return ret
return wrapper
@timeit
def func1(x):
time.sleep(x)
@timeit
def func2(x):
return x * 2
@timeit
def add(x, y):
return x+y
print(func1(1))
print(func2(2))
print(add(1,2))
- 如果存在带参数的函数,例如
@timeit(10)
,相当于timeit(10)(double)
,代入参数 10 先返回一个函数,再将 double 代入返回
CodeObject 理解
import dis
def f(a, b=2, /,*agrs, **kwargs):
pass
code = f.__code__
print(dis.dis(f))
print(dir(code))
print(code.co_code) # return byte code
# meta data
print(code.co_name) # function name
print(code.co_filename) # file name
print(code.co_lnotab) # 以二进制形式保存源代码的行数(已压缩)
print(code.co_stacksize) # 占用字节大小
print(code.co_flags) # 判断是否为genernator等,不同判断有不同的行为
# number of arguments(not including keyword
# only arguments, * or ** args)
print(code.co_argcount)
# number of positional only arguments
# 把函数修改为f(a, b=2, /, *agrs, **kwargs)时候,/前的参数都是为
# postional onlu args为2,调用时候不能写为f(a=1),可为f(1)、f(1,2)
print(code.co_posonlyargcount)
# number of keyword only arguments (not
# including ** args)
# 将函数修改为(a, *, b=2, *agrs, **kwargs),*之后的参数都需要以
# 关键字参数传入
print(code.co_kwonlyargcount)
"""
> output:
115 0 LOAD_CONST 0 (None)
2 RETURN_VALUE
None
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'co_argcount', 'co_cellvars', 'co_code', 'co_consts', 'co_filename', 'co_firstlineno', 'co_flags', 'co_freevars', 'co_kwonlyargcount', 'co_lnotab', 'co_name', 'co_names', 'co_nlocals', 'co_posonlyargcount', 'co_stacksize', 'co_varnames', 'replace']
b'd\x00S\x00'
f
C:\Users\shaohan.tian\Desktop\laptop_python_run\wandb_test.py
b'\x00\x01'
1
79
2
2
0
"""
asyncio 入门运行
本质上依旧是单进程单线程的程序,并不能提升运算速度,主要用于处理需要等待的任务,比如网络通讯,核心是event loop
,同时执行的任务只有一个。这里需要深入理解coroutine
和task
概念,函数定义时以async
开头,比如:
# corotine function
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
coro = main() # not running, return coroutine object
asyncio.run(coro) # build event loop and convert coro to task
以下代码中,利用 run 首先将 coroutine main 转换为 task,并告知 eventloop,eventloop 开始运行 main(),接着利用 await 将 say_after 这个 coroutine object 转化为 task,并告知 event loop,开始运行 say_after 这个 task,发现需要先完成 sleep 这个函数,等待 1s 完成,接下来类似情况,总共完成需要 3s 左右。
import time
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
async def main():
print(f"stared at {time.strftime('%X')}")
await say_after(1, 'hello')
# 将say_after包装成task并告知event loop,同时告知等待say_after完成后才开始
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
"""
output:
stared at 10:00:00
hello
world
finished at 10:00:03
[Finished in 3.4s]
"""
在下面的代码中,同时创建任务 1 和 2,在运行 main 的时候,task 异步运行
import time
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return f"{what} -{delay}"
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"stared at {time.strftime('%X')}")
# 可用asyncio.gather()替换
ret1 = await task1
ret2 = await task2
print(ret1,ret2)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
以下代码使用asyncio.gather()
可以更加简洁的方式实现,完成将coroutine object
转为 task 并创建所有任务:
import time
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return f"{what} -{delay}"
async def main():
print(f"stared at {time.strftime('%X')}")
ret = await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world')
)
print(ret) # return list
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
类的多重继承 method resolution order (MRO)
利用自带的mro
属性可以查询对应的继承顺序:
class A:
def say(self):
print("A")
class B(A):
def say(self):
print("B")
class C(A):
pass
class M(C, B):
pass
m = M()
# print(M.__mro__)
print(M.mro())
m.say()
"""
output:
[<class '__main__.M'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
B
[Finished in 284ms]
"""
C3 linearization
- a consistent extended precedence graph
- preservation of local precedence order, and
- 多个继承优先使用靠前的方法,比如 M(A,B),则优先考虑 A
- fitting a monotonicity criterion
- 任何一个 class 必须直接来自于他的直接父类
Python 中常用的 super()函数
super()
是 python 内置的类,super()是建立了一个对象。super()第一个参数决定从哪里开始找寻,第二个参数决定使用这个函数的对象和 mro。
from objprint import op
class Person:
def __init__(self,name):
self.name = name
class Male(Person):
def __init__(self,name):
# super().__init__(name)
# super(Male,self).__init__(name)等价与下一行代码
# 之所以不采用下一行代码,主要原因是:未来可能会改变
# 类的名称或继承方式,修改比较麻烦;同时,super是动态的
Person.__init__(self,name)
self.gender = "male"
m = Male("Peter")
op(m)
super(Male,m).__init__('Bob')
op(m)
"""
output:
[[36m<Male 0x1ad83675130[39m
[32m.gender [39m = 'male',
[32m.name [39m = 'Peter'
[36m> [39m
[Finished in 345ms]](<[36m%3CMale 0x1d32f3d5130[39m
[32m.gender[39m = 'male',
[32m.name[39m = 'Peter'
[36m%3E[39m
[36m<Male 0x1d32f3d5130[39m
[32m.gender[39m = 'male',
[32m.name[39m = 'Bob'
[36m>[39m
[Finished in 294ms]>)
"""
但 super()参数为空的时候,首先寻找自己被定义的 class,作为第一参数放入,接着找寻在哪个函数中被定义,接着将函数的第一个 arg 作为第二个参数放入。
Multiprocessing 基本调用
import multiprocessing
import os
def f():
print(os.getpid())
class MyProcess(multiprocessing.Process):
def run(self):
f()
if __name__ == '__main__':
print(os.getpid())
# method 1
p = multiprocessing.Process(target=f)
p.start()
p.join()
# method 2
p = MyProcess()
p.start()
p.join()
"""
output:
18372
19108
4356
[Finished in 569ms]
"""
变量命名
- 合法的变量名:字母数字下划线,数字不能开头
- 好的变量名:从内容上传递了多少有效信息,常见命名形式:
- 小写字母下划线:lower_underscore
- 大写字母下划线:UPPER_UNDERSCORE,大写下划线基本用来表示常量
- CamelCase,主要用于类的定义
- mixedCase,python 中比较少用
- 特殊命名形式:
for _ in range(10)
表达未使用变量- 类中的方法添加单个下划线,如
def _get_raw(self)
表示弱私有,可正常调用 - 类中的方法添加两个下划线,如
def __get_raw(self)
表示强私有,防止误用 - 类中前后添加双下划线,如
__hashattr__
,表示 python 内部自带的属性 - 变量命名的时候不要覆盖 builtin 函数,文件名不要和模块名相同
迭代器
基本定义
iterable
:an object capable of returning its members one at a time.
iterator
:an object representing a stream of data.
class NodeIter():
def __init__(self,node):
self.curr_node = node
def __next__(self):
if self.curr_node is None:
raise StopIteration
node, self.curr_node = self.curr_node, self.curr_node.next
return node
def __iter__(self):
return self
class Node():
def __init__(self,name):
self.name = name
self.next = None
def __iter__(self):
return NodeIter(self)
node1 = Node("node1")
node2 = Node("node2")
node3 = Node("node3")
node1.next = node2
node2.next = node3
it = iter(node1)
first = next(it)
for node in it:
print(node.name)
"""
output:
node2
node3
[Finished in 248ms]
"""