抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

SimPy Monitoring

Monitor processes

在一个或多个状态变量每次更改时或以离散的间隔监视它们的值,并将其存储在某个位置(例如,内存、数据库或文件中)

在以下例子中,只需使用一个列表,并在每次更改时附加所需的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import simpy

data = [] # This list will hold all collected data

def test_process(env, data):
val = 0
for i in range(5):
val += env.now
data.append(val) # Collect data
yield env.timeout(1)

env = simpy.Environment()
p = env.process(test_process(env, data))
env.run(p)
print('Collected', data)
1
Collected [0, 1, 3, 6, 10]

如果要监视多个变量,可以将 (named)tuples 附加到数据列表中。

如果将数据存储在 NumPy 数组或数据库中,可以通过将数据缓冲在 Python 列表中,并且只向数据库写入较大的块(或完整的数据集)。

Resource usage

资源的监视有很多种可能的情况,比如:

  • 一段时间内的资源利用率,

    • 一次使用资源的进程数
    • container 的 level
    • store 中的 item 数量

    这可以在离散的时间步长中进行监控,也可以在每次发生变化时进行监控

  • 一段时间内(put | get)队列中的进程数(以及平均值)。也可以在离散的时间步长或每次发生变化时进行监控

  • 对于 PreemptiveResource,可能需要测量一段时间内发生抢占的频率

没有直接的访问资源的代码,但可以通过 Monkey-patching 方法收集。

以下是一个示例,演示了如何向在 get/requestput/release 事件之前或之后被调用的资源添加回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from functools import partial, wraps
import simpy

def patch_resource(resource, pre=None, post=None):
"""Patch *resource* so that it calls the callable *pre* before each
put/get/request/release operation and the callable *post* after each
operation. The only argument to these functions is the resource
instance.

"""
def get_wrapper(func):
# Generate a wrapper for put/get/request/release
@wraps(func)
def wrapper(*args, **kwargs):
# This is the actual wrapper
# Call "pre" callback
if pre:
pre(resource)

# Perform actual operation
ret = func(*args, **kwargs)

# Call "post" callback
if post:
post(resource)

return ret
return wrapper

# Replace the original operations with our wrapper
for name in ['put', 'get', 'request', 'release']:
if hasattr(resource, name):
setattr(resource, name, get_wrapper(getattr(resource, name)))

def monitor(data, resource):
"""This is our monitoring callback."""
item = (
resource._env.now, # The current simulation time
resource.count, # The number of users
len(resource.queue), # The number of queued processes
)
data.append(item)

def test_process(env, res):
with res.request() as req:
yield req
yield env.timeout(1)

env = simpy.Environment()

res = simpy.Resource(env, capacity=1)
data = []
# Bind *data* as first argument to monitor()
# see https://docs.python.org/3/library/functools.html#functools.partial
monitor = partial(monitor, data)
patch_resource(res, post=monitor) # Patches (only) this resource instance

p = env.process(test_process(env, res))
env.run(p)

print(data)
1
[(0, 1, 0), (1, 0, 0)]

查看一次有多少个 processes 在等待同一个资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import simpy

class MonitoredResource(simpy.Resource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.data = []

def request(self, *args, **kwargs):
self.data.append((self._env.now, len(self.queue)))
return super().request(*args, **kwargs)

def release(self, *args, **kwargs):
self.data.append((self._env.now, len(self.queue)))
return super().release(*args, **kwargs)

def test_process(env, res):
with res.request() as req:
yield req
yield env.timeout(1)

env = simpy.Environment()

res = MonitoredResource(env, capacity=1)
p1 = env.process(test_process(env, res))
p2 = env.process(test_process(env, res))
env.run()

print(res.data)
1
[(0, 0), (0, 0), (1, 1), (2, 0)]

Event tracing

跟踪事件的创建、触发和处理时间,哪个进程创建了事件,哪些进程等待了事件。

  • Environment.step():处理所有事件
  • Environment.schedule():安排所有事件、插入 SimPy 事件队列中

Environment.step() 跟踪所有处理的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from functools import partial, wraps
import simpy

def trace(env, callback):
"""Replace the ``step()`` method of *env* with a tracing function
that calls *callbacks* with an events time, priority, ID and its
instance just before it is processed.

"""
def get_wrapper(env_step, callback):
"""Generate the wrapper for env.step()."""
@wraps(env_step)
def tracing_step():
"""Call *callback* for the next event if one exist before
calling ``env.step()``."""
if len(env._queue):
t, prio, eid, event = env._queue[0]
callback(t, prio, eid, event)
return env_step()
return tracing_step

env.step = get_wrapper(env.step, callback)

def monitor(data, t, prio, eid, event):
data.append((t, eid, type(event)))

def test_process(env):
yield env.timeout(1)

data = []
# Bind *data* as first argument to monitor()
# see https://docs.python.org/3/library/functools.html#functools.partial
monitor = partial(monitor, data)

env = simpy.Environment()
trace(env, monitor)

p = env.process(test_process(env))
env.run(until=p)

for d in data:
print(d)
1
2
3
(0, 0, <class 'simpy.events.Initialize'>)
(1, 1, <class 'simpy.events.Timeout'>)
(1, 2, <class 'simpy.events.Process'>)




SimPy 学习