1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource, ContainerRequestState, ContainerStatus } from "models/container-request";
17 import { ContainerState } from "models/container";
19 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
20 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{0,}Z/;
22 export const processLogsPanelActions = unionize({
23 RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
24 INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
25 SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
26 ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
29 // Max size of logs to fetch in bytes
30 const maxLogFetchSize: number = 128 * 1000;
32 type FileWithProgress = {
38 logType: LogEventType,
43 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
45 export const setProcessLogsPanelFilter = (filter: string) =>
46 processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
48 export const initProcessLogsPanel = (processUuid: string) =>
49 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
50 let process: Process | undefined;
52 dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
53 process = getProcess(processUuid)(getState().resources);
54 if (process?.containerRequest?.uuid) {
55 // Get log file size info
56 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
58 // Populate lastbyte 0 for each file
59 const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
61 // Fetch array of LogFragments
62 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
64 // Populate initial state with filters
65 const initialState = createInitialLogPanelState(logFiles, logLines);
66 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
69 // On error, populate empty state to allow polling to start
70 const initialState = createInitialLogPanelState([], []);
71 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
72 // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
73 if (e.status !== 404) {
74 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
76 if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
77 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
82 export const pollProcessLogs = (processUuid: string) =>
83 async (dispatch: Dispatch, getState: () => RootState, { logService, containerRequestService }: ServiceRepository) => {
85 // Get log panel state and process from store
86 const currentState = getState().processLogsPanel;
87 const process = getProcess(processUuid)(getState().resources);
89 // Check if container request is present and initial logs state loaded
90 if (process?.containerRequest?.uuid) {
92 // Perform range request(s) for each file
93 let logFiles: CollectionFile[] = [];
94 let logFragments: LogFragment[] = [];
96 if (process.containerRequest.logUuid && Object.keys(currentState.logs).length > 0) {
97 logFiles = await loadContainerLogFileList(process.containerRequest, logService);
99 // Determine byte to fetch from while filtering unchanged files
100 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
101 // Fetch last byte or 0 for new log files
102 const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
104 const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
105 const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
107 if (isNew || isChanged) {
108 return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
112 }, [] as FileWithProgress[]);
114 logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
117 if (process?.container?.state === ContainerState.QUEUED || process?.container?.state === ContainerState.LOCKED) {
118 const containerStatus: ContainerStatus = await containerRequestService.containerStatus(process?.containerRequest?.uuid, false);
120 logType: LogEventType.SCHEDULING,
121 contents: [`${new Date().toISOString()} ${containerStatus.schedulingStatus}`],
125 if (logFragments.length) {
126 // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
127 const groupedLogs = groupLogs(logFiles, logFragments);
128 await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
131 return Promise.resolve();
133 // Remove log when polling error is handled in some way instead of being ignored
134 console.error("Error occurred in pollProcessLogs:", e);
135 return Promise.reject();
139 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
140 const logCollectionContents = await logService.listLogFiles(containerRequest);
142 // Filter only root directory files matching log event types which have bytes
143 return logCollectionContents.filter((file): file is CollectionFile => (
144 file.type === CollectionFileType.FILE &&
145 PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
151 * Loads the contents of each file from each file's lastByte simultaneously
152 * while respecting the maxLogFetchSize by requesting the start and end
153 * of the desired block and inserting a snipline.
154 * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
157 * @returns LogFragment[] containing a single LogFragment corresponding to each input file
159 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
160 (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
161 const requestSize = file.size - lastByte;
162 if (requestSize > maxLogFetchSize) {
163 const chunkSize = Math.floor(maxLogFetchSize / 2);
164 const firstChunkEnd = lastByte + chunkSize - 1;
166 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
167 logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
168 ] as Promise<(LogFragment)>[]);
170 return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
173 if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
174 // Since allSettled does not pass promise rejection we throw an
175 // error if every request failed
176 const error = res.find(
177 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
179 return Promise.reject(error);
181 return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
182 // Filter out log files with rejected promises
183 // (Promise.all rejects on any failure)
184 promiseResult.status === 'fulfilled' &&
185 // Filter out files where any fragment is empty
186 // (prevent incorrect snipline generation or an un-resumable situation)
187 !!promiseResult.value.every(logFragment => logFragment.contents.length)
188 )).map(one => one.value)
189 })).map((logResponseSet) => {
190 // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
191 // Don't add snip line as a separate line so that sorting won't reorder it
192 for (let i = 1; i < logResponseSet.length; i++) {
193 const fragment = logResponseSet[i - 1];
194 const lastLineIndex = fragment.contents.length - 1;
195 const lastLineContents = fragment.contents[lastLineIndex];
196 const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
198 logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
201 // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
202 return logResponseSet.reduce((acc, curr: LogFragment) => ({
203 logType: curr.logType,
204 contents: [...(acc.contents || []), ...curr.contents]
205 }), {} as LogFragment);
209 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
210 const logs = groupLogs(logFiles, logFragments);
211 const filters = Object.keys(logs);
212 return { filters, logs };
216 * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
218 * @param logFragments
219 * @returns ProcessLogs for the store
221 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
222 const sortableLogFragments = mergeMultilineLoglines(logFragments);
224 const allLogs = mergeSortLogFragments(sortableLogFragments);
225 const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
227 const groupedLogs = logFragments.reduce((grouped, fragment) => ({
229 [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
233 [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
234 [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
240 * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
241 * If there is no previous line (first line has no timestamp), the line is deleted.
242 * Only used for combined logs that need sorting by timestamp after merging
243 * @param logFragments
244 * @returns Modified LogFragment[]
246 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
247 logFragments.map((fragment) => {
248 // Avoid altering the original fragment copy
249 let fragmentCopy: LogFragment = {
250 logType: fragment.logType,
251 contents: [...fragment.contents],
253 // Merge any non-timestamped lines in sortable log types with previous line
254 if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
255 for (let i = 0; i < fragmentCopy.contents.length; i++) {
256 const lineContents = fragmentCopy.contents[i];
257 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
258 // Partial line without timestamp detected
260 // If not first line, copy line to previous line
261 const previousLineContents = fragmentCopy.contents[i - 1];
262 const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
263 fragmentCopy.contents[i - 1] = newPreviousLineContents;
265 // Delete the current line and prevent iterating
266 fragmentCopy.contents.splice(i, 1);
276 * Merges log lines of different types and sorts types that contain timestamps (are sortable)
277 * @param logFragments
278 * @returns string[] of merged and sorted log lines
280 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
281 const sortableFragments = logFragments
282 .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
284 const nonSortableLines = fragmentsToLines(logFragments
285 .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
286 .sort((a, b) => (a.logType.localeCompare(b.logType))));
288 return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
292 * Performs merge and sort of input log fragment lines
293 * @param logFragments set of sortable log fragments to be merged and sorted
294 * @returns A string array containing all lines, sorted by timestamp and
295 * preserving line ordering and type grouping when timestamps match
297 const sortLogFragments = (logFragments: LogFragment[]): string[] => {
298 const linesWithType: SortableLine[] = logFragments
299 // Map each logFragment into an array of SortableLine
300 .map((fragment: LogFragment): SortableLine[] => (
301 fragment.contents.map((singleLine: string) => {
302 const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
303 const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
305 logType: fragment.logType,
306 timestamp: timestamp,
307 contents: singleLine,
310 // Merge each array of SortableLine into single array
311 )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
313 ), [] as SortableLine[]);
316 .sort(sortableLineSortFunc)
317 .map(lineWithType => lineWithType.contents);
321 * Sort func to sort lines
322 * Preserves original ordering of lines from the same source
323 * Stably orders lines of differing type but same timestamp
324 * (produces a block of same-timestamped lines of one type before a block
325 * of same timestamped lines of another type for readability)
326 * Sorts all other lines by contents (ie by timestamp)
328 const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
329 if (a.logType === b.logType) {
331 } else if (a.timestamp === b.timestamp) {
332 return a.logType.localeCompare(b.logType);
334 return a.contents.localeCompare(b.contents);
338 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
339 fragments.reduce((acc, fragment: LogFragment) => (
340 acc.concat(...fragment.contents)
344 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
345 return logFiles.find((file) => (file.name.startsWith(key)))?.size
348 export const navigateToLogCollection = (uuid: string) =>
349 async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
351 await services.collectionService.get(uuid);
352 dispatch<any>(navigateTo(uuid));
354 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
358 const ALL_FILTER_TYPE = 'All logs';
360 const MAIN_FILTER_TYPE = 'Main logs';
361 const MAIN_EVENT_TYPES = [
362 LogEventType.CRUNCH_RUN,
365 LogEventType.SCHEDULING,
368 const PROCESS_PANEL_LOG_EVENT_TYPES = [
369 LogEventType.ARV_MOUNT,
370 LogEventType.CRUNCH_RUN,
371 LogEventType.CRUNCHSTAT,
372 LogEventType.DISPATCH,
373 LogEventType.HOSTSTAT,
374 LogEventType.NODE_INFO,
377 LogEventType.CONTAINER,
378 LogEventType.KEEPSTORE,
379 LogEventType.SCHEDULING
382 const NON_SORTED_LOG_TYPES = [
383 LogEventType.NODE_INFO,
384 LogEventType.CONTAINER,