Merge branch '22316-test-fixes'
[arvados.git] / services / workbench2 / src / store / process-logs-panel / process-logs-panel-actions.ts
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource, ContainerRequestState, ContainerStatus } from "models/container-request";
17 import { ContainerState } from "models/container";
18
19 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
20 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{0,}Z/;
21
22 export const processLogsPanelActions = unionize({
23     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
24     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
25     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
26     ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
27 });
28
29 // Max size of logs to fetch in bytes
30 const maxLogFetchSize: number = 128 * 1000;
31
32 type FileWithProgress = {
33     file: CollectionFile;
34     lastByte: number;
35 }
36
37 type SortableLine = {
38     logType: LogEventType,
39     timestamp: string;
40     contents: string;
41 }
42
43 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
44
45 export const setProcessLogsPanelFilter = (filter: string) =>
46     processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
47
48 export const initProcessLogsPanel = (processUuid: string) =>
49     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
50         let process: Process | undefined;
51         try {
52             dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
53             process = getProcess(processUuid)(getState().resources);
54             if (process?.containerRequest?.uuid) {
55                 // Get log file size info
56                 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
57
58                 // Populate lastbyte 0 for each file
59                 const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
60
61                 // Fetch array of LogFragments
62                 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
63
64                 // Populate initial state with filters
65                 const initialState = createInitialLogPanelState(logFiles, logLines);
66                 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
67             }
68         } catch (e) {
69             // On error, populate empty state to allow polling to start
70             const initialState = createInitialLogPanelState([], []);
71             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
72             // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
73             if (e.status !== 404) {
74                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
75             }
76             if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
77                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
78             }
79         }
80     };
81
82 export const pollProcessLogs = (processUuid: string) =>
83     async (dispatch: Dispatch, getState: () => RootState, { logService, containerRequestService }: ServiceRepository) => {
84         try {
85             // Get log panel state and process from store
86             const currentState = getState().processLogsPanel;
87             const process = getProcess(processUuid)(getState().resources);
88
89             // Check if container request is present and initial logs state loaded
90             if (process?.containerRequest?.uuid) {
91
92                 // Perform range request(s) for each file
93                 let logFiles: CollectionFile[] = [];
94                 let logFragments: LogFragment[] = [];
95
96                 if (process.containerRequest.logUuid && Object.keys(currentState.logs).length > 0) {
97                     logFiles = await loadContainerLogFileList(process.containerRequest, logService);
98
99                     // Determine byte to fetch from while filtering unchanged files
100                     const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
101                         // Fetch last byte or 0 for new log files
102                         const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
103
104                         const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
105                         const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
106
107                         if (isNew || isChanged) {
108                             return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
109                         } else {
110                             return acc;
111                         }
112                     }, [] as FileWithProgress[]);
113
114                     logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
115                 }
116
117                 if (process?.container?.state === ContainerState.QUEUED || process?.container?.state === ContainerState.LOCKED) {
118                     const containerStatus: ContainerStatus = await containerRequestService.containerStatus(process?.containerRequest?.uuid, false);
119                     logFragments.push({
120                         logType: LogEventType.SCHEDULING,
121                         contents: [`${new Date().toISOString()} ${containerStatus.schedulingStatus}`],
122                     });
123                 }
124
125                 if (logFragments.length) {
126                     // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
127                     const groupedLogs = groupLogs(logFiles, logFragments);
128                     await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
129                 }
130             }
131             return Promise.resolve();
132         } catch (e) {
133             // Remove log when polling error is handled in some way instead of being ignored
134             console.error("Error occurred in pollProcessLogs:", e);
135             return Promise.reject();
136         }
137     };
138
139 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
140     const logCollectionContents = await logService.listLogFiles(containerRequest);
141
142     // Filter only root directory files matching log event types which have bytes
143     return logCollectionContents.filter((file): file is CollectionFile => (
144         file.type === CollectionFileType.FILE &&
145         PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
146         file.size > 0
147     ));
148 };
149
150 /**
151  * Loads the contents of each file from each file's lastByte simultaneously
152  *   while respecting the maxLogFetchSize by requesting the start and end
153  *   of the desired block and inserting a snipline.
154  * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
155  * @param logService
156  * @param process
157  * @returns LogFragment[] containing a single LogFragment corresponding to each input file
158  */
159 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
160     (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
161         const requestSize = file.size - lastByte;
162         if (requestSize > maxLogFetchSize) {
163             const chunkSize = Math.floor(maxLogFetchSize / 2);
164             const firstChunkEnd = lastByte + chunkSize - 1;
165             return Promise.all([
166                 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
167                 logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
168             ] as Promise<(LogFragment)>[]);
169         } else {
170             return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
171         }
172     })).then((res) => {
173         if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
174             // Since allSettled does not pass promise rejection we throw an
175             //   error if every request failed
176             const error = res.find(
177                 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
178             )?.reason;
179             return Promise.reject(error);
180         }
181         return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
182             // Filter out log files with rejected promises
183             //   (Promise.all rejects on any failure)
184             promiseResult.status === 'fulfilled' &&
185             // Filter out files where any fragment is empty
186             //   (prevent incorrect snipline generation or an un-resumable situation)
187             !!promiseResult.value.every(logFragment => logFragment.contents.length)
188         )).map(one => one.value)
189     })).map((logResponseSet) => {
190         // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
191         //   Don't add snip line as a separate line so that sorting won't reorder it
192         for (let i = 1; i < logResponseSet.length; i++) {
193             const fragment = logResponseSet[i - 1];
194             const lastLineIndex = fragment.contents.length - 1;
195             const lastLineContents = fragment.contents[lastLineIndex];
196             const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
197
198             logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
199         }
200
201         // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
202         return logResponseSet.reduce((acc, curr: LogFragment) => ({
203             logType: curr.logType,
204             contents: [...(acc.contents || []), ...curr.contents]
205         }), {} as LogFragment);
206     })
207 );
208
209 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
210     const logs = groupLogs(logFiles, logFragments);
211     const filters = Object.keys(logs);
212     return { filters, logs };
213 }
214
215 /**
216  * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
217  * @param logFiles
218  * @param logFragments
219  * @returns ProcessLogs for the store
220  */
221 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
222     const sortableLogFragments = mergeMultilineLoglines(logFragments);
223
224     const allLogs = mergeSortLogFragments(sortableLogFragments);
225     const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
226
227     const groupedLogs = logFragments.reduce((grouped, fragment) => ({
228         ...grouped,
229         [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
230     }), {});
231
232     return {
233         [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
234         [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
235         ...groupedLogs,
236     }
237 };
238
239 /**
240  * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
241  *   If there is no previous line (first line has no timestamp), the line is deleted.
242  *   Only used for combined logs that need sorting by timestamp after merging
243  * @param logFragments
244  * @returns Modified LogFragment[]
245  */
246 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
247     logFragments.map((fragment) => {
248         // Avoid altering the original fragment copy
249         let fragmentCopy: LogFragment = {
250             logType: fragment.logType,
251             contents: [...fragment.contents],
252         }
253         // Merge any non-timestamped lines in sortable log types with previous line
254         if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
255             for (let i = 0; i < fragmentCopy.contents.length; i++) {
256                 const lineContents = fragmentCopy.contents[i];
257                 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
258                     // Partial line without timestamp detected
259                     if (i > 0) {
260                         // If not first line, copy line to previous line
261                         const previousLineContents = fragmentCopy.contents[i - 1];
262                         const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
263                         fragmentCopy.contents[i - 1] = newPreviousLineContents;
264                     }
265                     // Delete the current line and prevent iterating
266                     fragmentCopy.contents.splice(i, 1);
267                     i--;
268                 }
269             }
270         }
271         return fragmentCopy;
272     })
273 );
274
275 /**
276  * Merges log lines of different types and sorts types that contain timestamps (are sortable)
277  * @param logFragments
278  * @returns string[] of merged and sorted log lines
279  */
280 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
281     const sortableFragments = logFragments
282         .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
283
284     const nonSortableLines = fragmentsToLines(logFragments
285         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
286         .sort((a, b) => (a.logType.localeCompare(b.logType))));
287
288     return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
289 };
290
291 /**
292  * Performs merge and sort of input log fragment lines
293  * @param logFragments set of sortable log fragments to be merged and sorted
294  * @returns A string array containing all lines, sorted by timestamp and
295  *          preserving line ordering and type grouping when timestamps match
296  */
297 const sortLogFragments = (logFragments: LogFragment[]): string[] => {
298     const linesWithType: SortableLine[] = logFragments
299         // Map each logFragment into an array of SortableLine
300         .map((fragment: LogFragment): SortableLine[] => (
301             fragment.contents.map((singleLine: string) => {
302                 const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
303                 const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
304                 return {
305                     logType: fragment.logType,
306                     timestamp: timestamp,
307                     contents: singleLine,
308                 };
309             })
310         // Merge each array of SortableLine into single array
311         )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
312             [...acc, ...lines]
313         ), [] as SortableLine[]);
314
315     return linesWithType
316         .sort(sortableLineSortFunc)
317         .map(lineWithType => lineWithType.contents);
318 };
319
320 /**
321  * Sort func to sort lines
322  *   Preserves original ordering of lines from the same source
323  *   Stably orders lines of differing type but same timestamp
324  *     (produces a block of same-timestamped lines of one type before a block
325  *     of same timestamped lines of another type for readability)
326  *   Sorts all other lines by contents (ie by timestamp)
327  */
328 const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
329     if (a.logType === b.logType) {
330         return 0;
331     } else if (a.timestamp === b.timestamp) {
332         return a.logType.localeCompare(b.logType);
333     } else {
334         return a.contents.localeCompare(b.contents);
335     }
336 };
337
338 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
339     fragments.reduce((acc, fragment: LogFragment) => (
340         acc.concat(...fragment.contents)
341     ), [] as string[])
342 );
343
344 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
345     return logFiles.find((file) => (file.name.startsWith(key)))?.size
346 };
347
348 export const navigateToLogCollection = (uuid: string) =>
349     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
350         try {
351             await services.collectionService.get(uuid);
352             dispatch<any>(navigateTo(uuid));
353         } catch {
354             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
355         }
356     };
357
358 const ALL_FILTER_TYPE = 'All logs';
359
360 const MAIN_FILTER_TYPE = 'Main logs';
361 const MAIN_EVENT_TYPES = [
362     LogEventType.CRUNCH_RUN,
363     LogEventType.STDERR,
364     LogEventType.STDOUT,
365     LogEventType.SCHEDULING,
366 ];
367
368 const PROCESS_PANEL_LOG_EVENT_TYPES = [
369     LogEventType.ARV_MOUNT,
370     LogEventType.CRUNCH_RUN,
371     LogEventType.CRUNCHSTAT,
372     LogEventType.DISPATCH,
373     LogEventType.HOSTSTAT,
374     LogEventType.NODE_INFO,
375     LogEventType.STDERR,
376     LogEventType.STDOUT,
377     LogEventType.CONTAINER,
378     LogEventType.KEEPSTORE,
379     LogEventType.SCHEDULING
380 ];
381
382 const NON_SORTED_LOG_TYPES = [
383     LogEventType.NODE_INFO,
384     LogEventType.CONTAINER,
385 ];