Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 125 additions & 125 deletions activitysim/abm/models/trip_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,60 +319,57 @@ def schedule_trips_in_leg(
result_list.append(choices)
trips = trips[~no_scheduling]

with chunk.chunk_log(trace_label):

# add next_trip_id temp column (temp as trips is now a copy, as result of slicing)
trips = trips.sort_index()
trips['next_trip_id'] = np.roll(trips.index, -1 if outbound else 1)
is_final = (trips.trip_num == trips.trip_count) if outbound else (trips.trip_num == 1)
trips.next_trip_id = trips.next_trip_id.where(~is_final, NO_TRIP_ID)

# iterate over outbound trips in ascending trip_num order, skipping the initial trip
# iterate over inbound trips in descending trip_num order, skipping the finial trip
first_trip_in_leg = True
for i in range(trips.trip_num.min(), trips.trip_num.max() + 1):

if outbound:
nth_trips = trips[trips.trip_num == i]
else:
nth_trips = trips[trips.trip_num == trips.trip_count - i]

nth_trace_label = tracing.extend_trace_label(trace_label, 'num_%s' % i)

with chunk.chunk_log(nth_trace_label, chunk_tag=trace_label):
choices = schedule_nth_trips(
nth_trips,
probs_spec,
model_settings,
first_trip_in_leg=first_trip_in_leg,
report_failed_trips=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=nth_trace_label)

# if outbound, this trip's depart constrains next trip's earliest depart option
# if inbound, we are handling in reverse order, so it constrains latest depart instead
ADJUST_NEXT_DEPART_COL = 'earliest' if outbound else 'latest'

# most initial departure (when no choice was made because all probs were zero)
if is_last_iteration and (failfix == FAILFIX_CHOOSE_MOST_INITIAL):
choices = choices.reindex(nth_trips.index)
logger.warning("%s coercing %s depart choices to most initial" %
(nth_trace_label, choices.isna().sum()))
choices = choices.fillna(trips[ADJUST_NEXT_DEPART_COL])

# adjust allowed depart range of next trip
has_next_trip = (nth_trips.next_trip_id != NO_TRIP_ID)
if has_next_trip.any():
next_trip_ids = nth_trips.next_trip_id[has_next_trip]
# patch choice any trips with next_trips that weren't scheduled
trips.loc[next_trip_ids, ADJUST_NEXT_DEPART_COL] = \
choices.reindex(next_trip_ids.index).fillna(trips[ADJUST_NEXT_DEPART_COL]).values

result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

first_trip_in_leg = False
# add next_trip_id temp column (temp as trips is now a copy, as result of slicing)
trips = trips.sort_index()
trips['next_trip_id'] = np.roll(trips.index, -1 if outbound else 1)
is_final = (trips.trip_num == trips.trip_count) if outbound else (trips.trip_num == 1)
trips.next_trip_id = trips.next_trip_id.where(~is_final, NO_TRIP_ID)

# iterate over outbound trips in ascending trip_num order, skipping the initial trip
# iterate over inbound trips in descending trip_num order, skipping the finial trip
first_trip_in_leg = True
for i in range(trips.trip_num.min(), trips.trip_num.max() + 1):

if outbound:
nth_trips = trips[trips.trip_num == i]
else:
nth_trips = trips[trips.trip_num == trips.trip_count - i]

nth_trace_label = tracing.extend_trace_label(trace_label, 'num_%s' % i)

choices = schedule_nth_trips(
nth_trips,
probs_spec,
model_settings,
first_trip_in_leg=first_trip_in_leg,
report_failed_trips=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=nth_trace_label)

# if outbound, this trip's depart constrains next trip's earliest depart option
# if inbound, we are handling in reverse order, so it constrains latest depart instead
ADJUST_NEXT_DEPART_COL = 'earliest' if outbound else 'latest'

# most initial departure (when no choice was made because all probs were zero)
if is_last_iteration and (failfix == FAILFIX_CHOOSE_MOST_INITIAL):
choices = choices.reindex(nth_trips.index)
logger.warning("%s coercing %s depart choices to most initial" %
(nth_trace_label, choices.isna().sum()))
choices = choices.fillna(trips[ADJUST_NEXT_DEPART_COL])

# adjust allowed depart range of next trip
has_next_trip = (nth_trips.next_trip_id != NO_TRIP_ID)
if has_next_trip.any():
next_trip_ids = nth_trips.next_trip_id[has_next_trip]
# patch choice any trips with next_trips that weren't scheduled
trips.loc[next_trip_ids, ADJUST_NEXT_DEPART_COL] = \
choices.reindex(next_trip_ids.index).fillna(trips[ADJUST_NEXT_DEPART_COL]).values

result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

first_trip_in_leg = False

if len(result_list) > 1:
choices = pd.concat(result_list)
Expand All @@ -381,7 +378,7 @@ def schedule_trips_in_leg(


def run_trip_scheduling(
trips,
trips_chunk,
tours,
probs_spec,
model_settings,
Expand All @@ -398,40 +395,38 @@ def run_trip_scheduling(
# num_choosers = (is_inbound_chooser | is_outbound_chooser).sum()

result_list = []
for i, trips_chunk, chunk_trace_label \
in chunk.adaptive_chunked_choosers_by_chunk_id(trips, chunk_size, trace_label, chunk_tag):

if trips_chunk.outbound.any():
leg_chunk = trips_chunk[trips_chunk.outbound]
leg_trace_label = tracing.extend_trace_label(chunk_trace_label, 'outbound')
choices = \
schedule_trips_in_leg(
outbound=True,
trips=leg_chunk,
probs_spec=probs_spec,
model_settings=model_settings,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=leg_trace_label)
result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

if (~trips_chunk.outbound).any():
leg_chunk = trips_chunk[~trips_chunk.outbound]
leg_trace_label = tracing.extend_trace_label(chunk_trace_label, 'inbound')
choices = \
schedule_trips_in_leg(
outbound=False,
trips=leg_chunk,
probs_spec=probs_spec,
model_settings=model_settings,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=leg_trace_label)
result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

if trips_chunk.outbound.any():
leg_chunk = trips_chunk[trips_chunk.outbound]
leg_trace_label = tracing.extend_trace_label(trace_label, 'outbound')
choices = \
schedule_trips_in_leg(
outbound=True,
trips=leg_chunk,
probs_spec=probs_spec,
model_settings=model_settings,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=leg_trace_label)
result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

if (~trips_chunk.outbound).any():
leg_chunk = trips_chunk[~trips_chunk.outbound]
leg_trace_label = tracing.extend_trace_label(trace_label, 'inbound')
choices = \
schedule_trips_in_leg(
outbound=False,
trips=leg_chunk,
probs_spec=probs_spec,
model_settings=model_settings,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
trace_label=leg_trace_label)
result_list.append(choices)

chunk.log_df(trace_label, f'result_list', result_list)

choices = pd.concat(result_list)

Expand Down Expand Up @@ -526,43 +521,48 @@ def trip_scheduling(
assert max_iterations > 0

choices_list = []
i = 0
while (i < max_iterations) and not trips_df.empty:

i += 1
is_last_iteration = (i == max_iterations)

trace_label_i = tracing.extend_trace_label(trace_label, "i%s" % i)
logger.info("%s scheduling %s trips", trace_label_i, trips_df.shape[0])

# first iteration gets its own chunk_tag and all subsequent iterations are aggregated
# subsequent iterations on failed trips have somewhat different overhead profile than initial batch
chunk_tag = "trip_scheduling_1" if i == 1 else "trip_scheduling_n"

choices = \
run_trip_scheduling(
trips_df,
tours,
probs_spec,
model_settings,
estimator=estimator,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
chunk_size=chunk_size,
chunk_tag=chunk_tag,
trace_label=trace_label_i)

# boolean series of trips whose individual trip scheduling failed
failed = choices.reindex(trips_df.index).isnull()
logger.info("%s %s failed", trace_label_i, failed.sum())

if not is_last_iteration:
# boolean series of trips whose leg scheduling failed
failed_cohorts = failed_trip_cohorts(trips_df, failed)
trips_df = trips_df[failed_cohorts]
choices = choices[~failed_cohorts]

choices_list.append(choices)
for chunk_i, trips_chunk, chunk_trace_label in chunk.adaptive_chunked_choosers_by_chunk_id(trips_df,
chunk_size,
trace_label,
trace_label):

i = 0
while (i < max_iterations) and not trips_chunk.empty:

# only chunk log first iteration since memory use declines with each iteration
with chunk.chunk_log(trace_label) if i == 0 else chunk.chunk_log_skip():

i += 1
is_last_iteration = (i == max_iterations)

trace_label_i = tracing.extend_trace_label(trace_label, "i%s" % i)
logger.info("%s scheduling %s trips within chunk %s", trace_label_i, trips_chunk.shape[0], chunk_i)

choices = \
run_trip_scheduling(
trips_chunk,
tours,
probs_spec,
model_settings,
estimator=estimator,
is_last_iteration=is_last_iteration,
trace_hh_id=trace_hh_id,
chunk_size=chunk_size,
chunk_tag=trace_label,
trace_label=trace_label_i)

# boolean series of trips whose individual trip scheduling failed
failed = choices.reindex(trips_chunk.index).isnull()
logger.info("%s %s failed", trace_label_i, failed.sum())

if not is_last_iteration:
# boolean series of trips whose leg scheduling failed
failed_cohorts = failed_trip_cohorts(trips_chunk, failed)
trips_chunk = trips_chunk[failed_cohorts]
choices = choices[~failed_cohorts]

choices_list.append(choices)

trips_df = trips.to_frame()

Expand Down
29 changes: 25 additions & 4 deletions activitysim/core/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,9 @@ def log_rss(trace_label, force=False):
hwm_trace_label = f"{trace_label}.log_rss"

if chunk_training_mode() == MODE_PRODUCTION:
trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
# FIXME - this trace_memory_info call slows things down a lot so it is turned off for now
# trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
# mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
return

rss, uss = mem.trace_memory_info(hwm_trace_label)
Expand Down Expand Up @@ -624,7 +625,12 @@ class ChunkSizer(object):
def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0):

self.depth = len(CHUNK_SIZERS) + 1
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

if self.depth > 1:
# nested chunkers should be unchunked
Expand Down Expand Up @@ -755,7 +761,14 @@ def adaptive_rows_per_chunk(self, i):

prev_rss = self.rss
prev_uss = self.uss
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if chunk_training_mode() != MODE_PRODUCTION:

if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

self.headroom = self.available_headroom(self.uss if chunk_metric() == USS else self.rss)

Expand Down Expand Up @@ -904,6 +917,14 @@ def chunk_log(trace_label, chunk_tag=None, base=False):
chunk_sizer.close()


@contextmanager
def chunk_log_skip():

yield

None


def adaptive_chunked_choosers(choosers, chunk_size, trace_label, chunk_tag=None):

# generator to iterate over choosers
Expand Down
Loading